[jira] [Resolved] (KAFKA-16035) add integration test for ExpiresPerSec and RemoteLogSizeComputationTime metrics
[ https://issues.apache.org/jira/browse/KAFKA-16035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16035. --- Fix Version/s: 3.7.0 Resolution: Fixed > add integration test for ExpiresPerSec and RemoteLogSizeComputationTime > metrics > --- > > Key: KAFKA-16035 > URL: https://issues.apache.org/jira/browse/KAFKA-16035 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Fix For: 3.7.0 > > > add integration test for ExpiresPerSec and RemoteLogSizeComputationTime > metrics > https://github.com/apache/kafka/pull/15015/commits/517a7c19d5a19bc94f0f79c02a239fd1ff7f6991 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16035: add tests for remoteLogSizeComputationTime/remoteFetchExpiresPerSec metrics [kafka]
showuon merged PR #15056: URL: https://github.com/apache/kafka/pull/15056 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16014: Implement RemoteLogSizeBytes [kafka]
showuon merged PR #15050: URL: https://github.com/apache/kafka/pull/15050 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16014: Implement RemoteLogSizeBytes [kafka]
showuon commented on PR #15050: URL: https://github.com/apache/kafka/pull/15050#issuecomment-1867309312 Had run twice CI and failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Almog Gavra updated KAFKA-16046: Labels: streams (was: ) > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Almog Gavra updated KAFKA-16046: Component/s: streams > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > Labels: streams > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
Almog Gavra created KAFKA-16046: --- Summary: Stream Stream Joins fail after restoration with deserialization exceptions Key: KAFKA-16046 URL: https://issues.apache.org/jira/browse/KAFKA-16046 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Almog Gavra Assignee: Almog Gavra Before KIP-954, the `KStreamImplJoin` class would always create non-timestamped persistent windowed stores. After that KIP, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their changelog values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions
[ https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799623#comment-17799623 ] Almog Gavra commented on KAFKA-16046: - https://github.com/apache/kafka/pull/15061 > Stream Stream Joins fail after restoration with deserialization exceptions > -- > > Key: KAFKA-16046 > URL: https://issues.apache.org/jira/browse/KAFKA-16046 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Almog Gavra >Assignee: Almog Gavra >Priority: Blocker > > Before KIP-954, the `KStreamImplJoin` class would always create > non-timestamped persistent windowed stores. After that KIP, the default was > changed to create timestamped stores. This wasn't compatible because, during > restoration, timestamped stores have their changelog values transformed to > prepend the timestamp to the value. This caused serialization errors when > trying to read from the store because the deserializers did not expect the > timestamp to be prepended. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] fix stream-stream-join store types [kafka]
agavra opened a new pull request, #15061: URL: https://github.com/apache/kafka/pull/15061 Before #14648, the `KStreamImplJoin` class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended. To fix this, we allow creating non-timestamped stores using the `DslWindowParams` Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
philipnee commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434559814 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java: ## @@ -96,13 +96,13 @@ import static org.apache.kafka.common.utils.Utils.swallow; /** - * A client that consumes records from a Kafka cluster using the {@link GroupProtocol#GENERIC generic group protocol}. + * A client that consumes records from a Kafka cluster using the {@link GroupProtocol#CLASSIC classic group protocol}. * In this implementation, all network I/O happens in the thread of the application making the call. * * * * Note: per its name, this implementation is left for backward compatibility purposes. The updated consumer - * group protocol (from KIP-848) introduces allows users continue using the legacy "generic" group protocol. + * group protocol (from KIP-848) introduces allows users continue using the legacy "classic" group protocol. Review Comment: there's a bit of grammatical confusion here, perhaps: `group protocol ... allows user continue...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16026: Send Poll event to the background thread [kafka]
philipnee commented on PR #15035: URL: https://github.com/apache/kafka/pull/15035#issuecomment-1867003094 @dajac - thanks for merging the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16040) Rename `Generic` to `Classic`
[ https://issues.apache.org/jira/browse/KAFKA-16040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16040. - Resolution: Fixed > Rename `Generic` to `Classic` > - > > Key: KAFKA-16040 > URL: https://issues.apache.org/jira/browse/KAFKA-16040 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.7.0 > > > People has raised concerned about using {{Generic}} as a name to designate > the old rebalance protocol. We considered using {{Legacy}} but discarded it > because there are still applications, such as Connect, using the old > protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance > Protocol{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac merged PR #15059: URL: https://github.com/apache/kafka/pull/15059 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on PR #15059: URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866963946 As it is really hard to get a clean build due to OOM errors (also in trunk), I have verified the build locally. I am going to merge it to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16045) ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky
[ https://issues.apache.org/jira/browse/KAFKA-16045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-16045: --- Labels: flaky-test (was: ) > ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky > - > > Key: KAFKA-16045 > URL: https://issues.apache.org/jira/browse/KAFKA-16045 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I'm seeing ZkMigrationIntegrationTest.testMigrateTopicDeletion fail for many > builds. I believe it is also causing a thread leak because on most runs where > it fails I also see ReplicaManager tests also fail with extra threads. > The test always fails > `org.opentest4j.AssertionFailedError: Timed out waiting for topics to be > deleted` > gradle enterprise link: > [https://ge.apache.org/scans/tests?search.names=Git%20branch[…]lues=trunk=kafka.zk.ZkMigrationIntegrationTest|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.zk.ZkMigrationIntegrationTest] > recent pr: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15023/18/tests/] > trunk builds: > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2502/tests], > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2501/tests] > (edited) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16045) ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky
Justine Olshan created KAFKA-16045: -- Summary: ZkMigrationIntegrationTest.testMigrateTopicDeletion flaky Key: KAFKA-16045 URL: https://issues.apache.org/jira/browse/KAFKA-16045 Project: Kafka Issue Type: Test Reporter: Justine Olshan I'm seeing ZkMigrationIntegrationTest.testMigrateTopicDeletion fail for many builds. I believe it is also causing a thread leak because on most runs where it fails I also see ReplicaManager tests also fail with extra threads. The test always fails `org.opentest4j.AssertionFailedError: Timed out waiting for topics to be deleted` gradle enterprise link: [https://ge.apache.org/scans/tests?search.names=Git%20branch[…]lues=trunk=kafka.zk.ZkMigrationIntegrationTest|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.zk.ZkMigrationIntegrationTest] recent pr: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15023/18/tests/] trunk builds: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2502/tests], [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2501/tests] (edited) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866936613 jira here: [KAFKA-16045](https://issues.apache.org/jira/browse/KAFKA-16045) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866933588 Yeah. I was looking at builds with @dajac for his PR too. The java 8 one here OOM'd. I think the ZkMigrationTests are causing the thread leak for the replica manager since I see replica manager tests failing with this issue on the same builds as the ZKMigrationTest failures and not a lot of commonalities otherwise. I will file a JIRA for that. I will start one more time to get a super clear signal and will come back afterwards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866886304 jenkins doesn't seem to be in a great state. there are a bunch of zkMigrationIntegrationTest and ReplicaManagerTest complaining timeout and `Found unexpected 1 NonDaemon threads` (there are others as well) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Aghili updated KAFKA-16027: - External issue URL: https://github.com/apache/kafka/pull/15055 > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Aghili updated KAFKA-16027: - External issue URL: (was: https://github.com/apache/kafka/pull/15055) > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16027) Refactor MetadataTest#testUpdatePartitionLeadership
[ https://issues.apache.org/jira/browse/KAFKA-16027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Aghili reassigned KAFKA-16027: Assignee: Alexander Aghili > Refactor MetadataTest#testUpdatePartitionLeadership > --- > > Key: KAFKA-16027 > URL: https://issues.apache.org/jira/browse/KAFKA-16027 > Project: Kafka > Issue Type: Improvement >Reporter: Philip Nee >Assignee: Alexander Aghili >Priority: Minor > Labels: newbie > > MetadataTest#testUpdatePartitionLeadership is extremely long. I think it is > pretty close to the 160 line method limit - I tried to modfity it but it > would hit the limit when i tried to break things into separated lines. > The test also contains two tests, so it is best to split it into two separate > tests. > We should also move this to ConsumerMetadata.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866810047 Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16021: Eagerly look up StringSerializer encoding during configure [kafka]
srdo commented on PR #15024: URL: https://github.com/apache/kafka/pull/15024#issuecomment-1866797498 Thanks. Added the suggested test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac merged PR #14985: URL: https://github.com/apache/kafka/pull/14985 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on PR #14985: URL: https://github.com/apache/kafka/pull/14985#issuecomment-1866794392 We've got a decent build [here](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14985/9/tests/). All the failed tests are unrelated. I am going to merge it to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15303: Avoid unnecessary re-serialization in FK-join [kafka]
CharlyRien commented on PR #14157: URL: https://github.com/apache/kafka/pull/14157#issuecomment-1866758963 Hello. Any news on this issue? @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799532#comment-17799532 ] Arpit Goyal commented on KAFKA-15388: - [~divijvaidya] [~satish.duggana] [~showuon] Pr for review https://github.com/apache/kafka/pull/15060 > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
jolshan commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1866730662 Sorry for the delay @MikeEdgar. That is a known issue for the test. I will also look at this PR and run the build again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866728811 Thanks for checking @philipnee. I will wait for this run to complete and we should be good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15388: Handling remote segment read in case of log compaction [kafka]
iit2009060 opened a new pull request, #15060: URL: https://github.com/apache/kafka/pull/15060 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use --no-daemon when building with Jenkins [kafka]
ijuma commented on code in PR #15057: URL: https://github.com/apache/kafka/pull/15057#discussion_r1434339183 ## Jenkinsfile: ## @@ -21,7 +21,7 @@ def doValidation() { // Run all the tasks associated with `check` except for `test` - the latter is executed via `doTest` sh """ ./retry_zinc ./gradlew -PscalaVersion=$SCALA_VERSION clean check -x test \ ---profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode="session" +--no-daemon --profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode="session" Review Comment: We can probably remove `keepAliveMode` if we use `no-daemon`: > keepAliveMode: configures the keep alive mode for the Gradle compilation daemon - reuse improves start-up time. The values should be one of daemon or session (the default is daemon). daemon keeps the daemon alive until it's explicitly stopped while session keeps it alive until the end of the build session. This currently only affects the Scala compiler, see https://github.com/gradle/gradle/pull/21034 for a PR that attempts to do the same for the Java compiler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866678547 Consumer Integration tests also seems fine. ![image](https://github.com/apache/kafka/assets/1930388/df5307c2-0735-4789-9b87-e6d5eb9f1e7d) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10897) kafka quota optimization
[ https://issues.apache.org/jira/browse/KAFKA-10897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799504#comment-17799504 ] Afshin Moazami commented on KAFKA-10897: For topic-partition quota configuration, I proposed this KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota] > kafka quota optimization > > > Key: KAFKA-10897 > URL: https://issues.apache.org/jira/browse/KAFKA-10897 > Project: Kafka > Issue Type: Improvement > Components: admin, clients, config, consumer, core >Affects Versions: 2.7.0 >Reporter: yangyijun >Assignee: Kahn Cheny >Priority: Blocker > > *1.The current quota dimensions is as follows:* > {code:java} > /config/users//clients/ > /config/users//clients/ > /config/users/ > /config/users//clients/ > /config/users//clients/ > /config/users/ > /config/clients/ > /config/clients/{code} > *2. Existing problems:* > > {code:java} > 2.1.The quota dimensions is not fine enough. > 2.2.When multiple users on the same broker produce and consume a large amount > of data at the same time, if you want the broker to run normally, you must > make the sum of all user quota byte not exceed the upper throughput limit of > the broker. > 2.3.Even if all the user rate does not reach the upper limit of the broker, > but all the user rate is concentrated on a few disks and exceeds the > read-write load of the disk, all the produce and consume requests will be > blocked. > 2.4.Sometimes it's just one topic rate increase sharply under the user, so we > just need to limit the increase sharply topics. > {code} > > *3. Suggestions for improvement* > {code:java} > 3.1. Add the upper limit of single broker quota byte. > 3.2. Add the upper limit of single disk quota byte on the broker. > 3.3. Add topic quota dimensions.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16041) Replace Afterburn module with Blackbird
[ https://issues.apache.org/jira/browse/KAFKA-16041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799498#comment-17799498 ] Mickael Maison commented on KAFKA-16041: This is follow up work from https://issues.apache.org/jira/browse/KAFKA-15996 > Replace Afterburn module with Blackbird > --- > > Key: KAFKA-16041 > URL: https://issues.apache.org/jira/browse/KAFKA-16041 > Project: Kafka > Issue Type: Task > Components: connect >Reporter: Mario Fiore Vitale >Priority: Major > Fix For: 4.0.0 > > > [Blackbird|https://github.com/FasterXML/jackson-modules-base/blob/master/blackbird/README.md] > is the Afterburn replacement for Java 11+ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866627736 ![image](https://github.com/apache/kafka/assets/1930388/6d79e6b4-76dc-49cc-8cbb-9b237a5dc7ef) Unit tests seems fine, there is 1 known flaky test under AbstractCoordinator and 1 related to AsyncKafkaConsumer. Both are unrelated to the metadata change here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16044) Throttling using Topic Partition Quota
Afshin Moazami created KAFKA-16044: -- Summary: Throttling using Topic Partition Quota Key: KAFKA-16044 URL: https://issues.apache.org/jira/browse/KAFKA-16044 Project: Kafka Issue Type: New Feature Reporter: Afshin Moazami With !https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21141=issuetype! KAFKA-16042 introducing the topic-partition byte rate and metrics, and !https://issues.apache.org/jira/secure/viewavatar?size=xsmall=21141=issuetype! KAFKA-16043 introducing the quota limit configuration in the topic-level, we can enforce quota on topic-partition level for configured topics. More details in the [KIP-1010|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16043) Add Quota configuration for topics
Afshin Moazami created KAFKA-16043: -- Summary: Add Quota configuration for topics Key: KAFKA-16043 URL: https://issues.apache.org/jira/browse/KAFKA-16043 Project: Kafka Issue Type: New Feature Reporter: Afshin Moazami To be able to have topic-partition quota, we need to introduce two topic configuration for the producer-byte-rate and consumer-byte-rate. The assumption is that all partitions of the same topic get the same quota, so we define one config per topic. This configuration should work both with zookeeper and kraft setup. Also, we should define a default quota value (to be discussed) and potentially use the same format as user/client default configuration using `` as the value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16042) Quota Metrics based on per topic-partition produce/fetch byte rate
Afshin Moazami created KAFKA-16042: -- Summary: Quota Metrics based on per topic-partition produce/fetch byte rate Key: KAFKA-16042 URL: https://issues.apache.org/jira/browse/KAFKA-16042 Project: Kafka Issue Type: New Feature Components: core Reporter: Afshin Moazami Assignee: Afshin Moazami Currently, Kafka emits the producer-byte-rate and fetch-bytes-rate for quota calculations. By adding a new signature to the `[quotaMetricTags|https://github.com/afshing/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java#L40]` method to add the individual topic-partitions size as a parameter, we can define metrics based on the topic name and partition id. To do that, we need both `ProduceRequest` and `FetchResponse` have the `partitionSizes` method and it is public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
philipnee commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866607465 Hey @jolshan - Thanks. OOM also happens to another PR of mine see #15035 - i thought it was introduced by the AsyncKafkaConsumerTest but apparently there's more to it... Let me verify the tests locally and I'll the results there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16012: Ensure new leader information merged correctly with the current metadata [kafka]
jolshan commented on PR #15023: URL: https://github.com/apache/kafka/pull/15023#issuecomment-1866595042 I'm seeing some OOMs and such again. I will run one more time. @philipnee if you don't mind just double checking locally for the tests on your end. I'm not seeing any failing consistently, but just want to be extra careful near the code freeze. I will check back in few hours or so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16016: Add docker wrapper in core and remove docker utility script [kafka]
mimaison commented on code in PR #15048: URL: https://github.com/apache/kafka/pull/15048#discussion_r1434227907 ## docker/jvm/launch: ## @@ -28,25 +27,25 @@ fi # the default is to pick the first IP (or network). export KAFKA_JMX_HOSTNAME=${KAFKA_JMX_HOSTNAME:-$(hostname -i | cut -d" " -f1)} -if [ "$KAFKA_JMX_PORT" ]; then +if [ "${KAFKA_JMX_PORT-}" ]; then # This ensures that the "if" section for JMX_PORT in kafka launch script does not trigger. export JMX_PORT=$KAFKA_JMX_PORT -export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT" +export KAFKA_JMX_OPTS="${KAFKA_JMX_OPTS-} -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT" fi # Make a temp env variable to store user provided performance otps -if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then +if [ -z "${KAFKA_JVM_PERFORMANCE_OPTS-}" ]; then export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="" else export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS" fi # We will first use CDS for storage to format storage -export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:SharedArchiveFile=/opt/kafka/storage.jsa" +export KAFKA_JVM_PERFORMANCE_OPTS="${KAFKA_JVM_PERFORMANCE_OPTS-} -XX:SharedArchiveFile=/opt/kafka/storage.jsa" echo "===> Using provided cluster id $CLUSTER_ID ..." # A bit of a hack to not error out if the storage is already formatted. Need storage-tool to support this Review Comment: Could we handle that in the wrapper? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434244004 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + +// Commit completion #1. +
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434242991 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -102,6 +241,31 @@ public long append( } } +@Override +public long appendEndTransactionMarker( +TopicPartition tp, +long producerId, +short producerEpoch, +int coordinatorEpoch, +TransactionResult result +) throws KafkaException { +PartitionState state = partitionState(tp); +state.lock.lock(); +try { +state.entries.add(new LogControl( +producerId, +producerEpoch, +coordinatorEpoch, +result +)); +state.endOffset += 1; +if (autoCommit) commit(tp, state.endOffset); Review Comment: synced offline -- this is like acks=all for one replica and helpful in tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
nizhikov commented on code in PR #14867: URL: https://github.com/apache/kafka/pull/14867#discussion_r1434240386 ## core/src/main/scala/kafka/zk/ZkData.scala: ## @@ -1103,11 +1105,11 @@ object ZkData { IsrChangeNotificationZNode.path, ProducerIdBlockZNode.path, LogDirEventNotificationZNode.path - ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path) + ) ++ JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.map(ConfigEntityTypeZNode.path) Review Comment: Great catch. Thanks. Rewrote like you suggested. ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -77,8 +78,9 @@ object ConfigCommand extends Logging { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" - val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+ ConfigType.ClientMetrics - val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker) + @nowarn("cat=deprecation") + val BrokerSupportedConfigTypes = JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.toSeq :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS Review Comment: Great catch. Thanks. Rewrote like you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
mfvitale commented on PR #14992: URL: https://github.com/apache/kafka/pull/14992#issuecomment-1866538380 > Thank you for this PR. I have left some minor comments. > > Also, please add a JIRA targeting 4.0 (where we will deprecate JDK 8) to replace afterburn with blackbird. @divijvaidya https://issues.apache.org/jira/browse/KAFKA-16041 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16041) Replace Afterburn module with Blackbird
Mario Fiore Vitale created KAFKA-16041: -- Summary: Replace Afterburn module with Blackbird Key: KAFKA-16041 URL: https://issues.apache.org/jira/browse/KAFKA-16041 Project: Kafka Issue Type: Task Components: connect Reporter: Mario Fiore Vitale Fix For: 4.0.0 [Blackbird|https://github.com/FasterXML/jackson-modules-base/blob/master/blackbird/README.md] is the Afterburn replacement for Java 11+ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
mfvitale commented on code in PR #14992: URL: https://github.com/apache/kafka/pull/14992#discussion_r1434225385 ## build.gradle: ## @@ -940,6 +940,7 @@ project(':core') { implementation libs.jacksonModuleScala implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes +implementation libs.jacksonAfterburner Review Comment: I have been misled by other Jackson dependencies. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434220316 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -102,6 +241,31 @@ public long append( } } +@Override +public long appendEndTransactionMarker( +TopicPartition tp, +long producerId, +short producerEpoch, +int coordinatorEpoch, +TransactionResult result +) throws KafkaException { +PartitionState state = partitionState(tp); +state.lock.lock(); +try { +state.entries.add(new LogControl( +producerId, +producerEpoch, +coordinatorEpoch, +result +)); +state.endOffset += 1; +if (autoCommit) commit(tp, state.endOffset); Review Comment: No. It is more like having a partition with a single replica so the HWM advances directly. Keep in mind that this is designed for being used in tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434218393 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + +// Commit completion #1. +
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434215098 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -102,6 +241,31 @@ public long append( } } +@Override +public long appendEndTransactionMarker( +TopicPartition tp, +long producerId, +short producerEpoch, +int coordinatorEpoch, +TransactionResult result +) throws KafkaException { +PartitionState state = partitionState(tp); +state.lock.lock(); +try { +state.entries.add(new LogControl( +producerId, +producerEpoch, +coordinatorEpoch, +result +)); +state.endOffset += 1; +if (autoCommit) commit(tp, state.endOffset); Review Comment: Is this like acks = 0? I wasn't aware of when this was used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1434214195 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + +// Commit completion #1. +
Re: [PR] KAFKA-14588 ConfigType moved to server-common [kafka]
mimaison commented on code in PR #14867: URL: https://github.com/apache/kafka/pull/14867#discussion_r1434193675 ## core/src/main/scala/kafka/zk/ZkData.scala: ## @@ -1103,11 +1105,11 @@ object ZkData { IsrChangeNotificationZNode.path, ProducerIdBlockZNode.path, LogDirEventNotificationZNode.path - ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path) + ) ++ JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.map(ConfigEntityTypeZNode.path) Review Comment: Same as above, can we build this collection without using `JavaConverters`? ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -77,8 +78,9 @@ object ConfigCommand extends Logging { val BrokerDefaultEntityName = "" val BrokerLoggerConfigType = "broker-loggers" - val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+ ConfigType.ClientMetrics - val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker) + @nowarn("cat=deprecation") + val BrokerSupportedConfigTypes = JavaConverters.asScalaIteratorConverter(ConfigType.ALL.iterator()).asScala.toSeq :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS Review Comment: Why are we using an iterator here? Can we use `val BrokerSupportedConfigTypes: Seq[String] = ConfigType.ALL.asScala :+ BrokerLoggerConfigType :+ ConfigType.CLIENT_METRICS`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [KAFKA-16015] Fix custom timeouts overwritten by defaults [kafka]
pprovenzano commented on PR #15030: URL: https://github.com/apache/kafka/pull/15030#issuecomment-1866442934 > @pprovenzano , I dont have permissions to merge the PR, so I guess somebody else eventually will do it, is that correct? thanks! I don't have them either. I've asked another to merge and also cherry-pick to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15456) Add support for OffsetFetch version 9 in consumer
[ https://issues.apache.org/jira/browse/KAFKA-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15456. - Resolution: Fixed > Add support for OffsetFetch version 9 in consumer > - > > Key: KAFKA-15456 > URL: https://issues.apache.org/jira/browse/KAFKA-15456 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15456: Commit/Fetch error handling improvements and V9 support [kafka]
dajac merged PR #14557: URL: https://github.com/apache/kafka/pull/14557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15456: Commit/Fetch error handling improvements and V9 support [kafka]
dajac commented on PR #14557: URL: https://github.com/apache/kafka/pull/14557#issuecomment-1866401989 We've got a reasonably good build here: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14557/34/tests/. Based on this one, it is safe to merge this PR. I will merge it to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on PR #15059: URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866369216 @lucasbru Thanks. I just fixed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
lucasbru commented on PR #15059: URL: https://github.com/apache/kafka/pull/15059#issuecomment-1866354730 Please check * `GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS` * Some comments in `GroupCoordinator` * Various names in `GroupCoordinatorMetrics` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16030) new group coordinator should check if partition goes offline during load
[ https://issues.apache.org/jira/browse/KAFKA-16030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16030. - Fix Version/s: 3.7.0 Resolution: Fixed > new group coordinator should check if partition goes offline during load > > > Key: KAFKA-16030 > URL: https://issues.apache.org/jira/browse/KAFKA-16030 > Project: Kafka > Issue Type: Sub-task >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > Fix For: 3.7.0 > > > The new coordinator stops loading if the partition goes offline during load. > However, the partition is still considered active. Instead, we should return > NOT_LEADER_OR_FOLLOWER exception during load. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16030: new group coordinator should check if partition goes offline during load [kafka]
dajac commented on PR #15043: URL: https://github.com/apache/kafka/pull/15043#issuecomment-1866342682 It is hard to clean a clean build here too but failures are unrelated. I also verified it locally. I will merge it to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16030: new group coordinator should check if partition goes offline during load [kafka]
dajac merged PR #15043: URL: https://github.com/apache/kafka/pull/15043 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15996: Improve JsonConverter performance [kafka]
divijvaidya commented on code in PR #14992: URL: https://github.com/apache/kafka/pull/14992#discussion_r1434120692 ## build.gradle: ## @@ -940,6 +940,7 @@ project(':core') { implementation libs.jacksonModuleScala implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes +implementation libs.jacksonAfterburner Review Comment: Please help me understand why did we add this dependency in clients? ## connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java: ## @@ -235,18 +235,24 @@ public Object toConnect(final Schema schema, final JsonNode value) { private final JsonDeserializer deserializer; public JsonConverter() { +this(true); +} + +public JsonConverter(boolean enableModules) { serializer = new JsonSerializer( -mkSet(), -JSON_NODE_FACTORY +mkSet(), Review Comment: nit please maintain original indentation ## connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java: ## @@ -235,18 +235,24 @@ public Object toConnect(final Schema schema, final JsonNode value) { private final JsonDeserializer deserializer; public JsonConverter() { +this(true); +} + +public JsonConverter(boolean enableModules) { Review Comment: please add a javadoc comment specifying that this ctor is visible only for benchmarking. we add similar comments when we expose a new ctor/method for testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434128032 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits"; public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired"; public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions"; -public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; -public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; +public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; +public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434112503 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits"; public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired"; public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions"; -public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; -public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; +public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; +public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; Review Comment: Indeed, we should rename this one. Missed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
divijvaidya commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434097877 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -70,11 +70,11 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC public static final String OFFSET_COMMITS_SENSOR_NAME = "OffsetCommits"; public static final String OFFSET_EXPIRED_SENSOR_NAME = "OffsetExpired"; public static final String OFFSET_DELETIONS_SENSOR_NAME = "OffsetDeletions"; -public static final String GENERIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; -public static final String GENERIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; +public static final String CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME = "CompletedRebalances"; +public static final String CLASSIC_GROUP_REBALANCES_SENSOR_NAME = "GenericGroupRebalances"; Review Comment: I should have added my previous comment over here. Let's continue conversation at this place. I understand that we have only changed the variable name of the sensor here. My question is, should we change "GenericGroupRebalances" to "ClassicGroupRebalances" here if this has just been introduced in 3.7? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434068651 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -319,40 +319,40 @@ public Long value() { registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.PREPARING_REBALANCE); +return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.COMPLETING_REBALANCE); +return numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.STABLE); +return numGenericGroups(ClassicGroupState.STABLE); } }); registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.DEAD); +return numGenericGroups(ClassicGroupState.DEAD); } }); registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge() { Review Comment: Regarding `GENERIC_GROUP_REBALANCES_SENSOR_NAME`, the patch only changes the name of the sensor. The metrics are not touched. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434063518 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -319,40 +319,40 @@ public Long value() { registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.PREPARING_REBALANCE); +return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.COMPLETING_REBALANCE); +return numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.STABLE); +return numGenericGroups(ClassicGroupState.STABLE); } }); registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.DEAD); +return numGenericGroups(ClassicGroupState.DEAD); } }); registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge() { Review Comment: We don't have any existing metrics using "generic". They are all new in 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
divijvaidya commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434047673 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -319,40 +319,40 @@ public Long value() { registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.PREPARING_REBALANCE); +return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.COMPLETING_REBALANCE); +return numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.STABLE); +return numGenericGroups(ClassicGroupState.STABLE); } }); registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.DEAD); +return numGenericGroups(ClassicGroupState.DEAD); } }); registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge() { Review Comment: Are we keeping the metrics name with "generic" keyword though? I think we should if these metrics have already been released in previous versions, if not, might want to change the metric names to "classic" as well. e.g. GENERIC_GROUP_REBALANCES_SENSOR_NAME -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
divijvaidya commented on code in PR #15059: URL: https://github.com/apache/kafka/pull/15059#discussion_r1434047673 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ## @@ -319,40 +319,40 @@ public Long value() { registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.PREPARING_REBALANCE); +return numGenericGroups(ClassicGroupState.PREPARING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.COMPLETING_REBALANCE); +return numGenericGroups(ClassicGroupState.COMPLETING_REBALANCE); } }); registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.STABLE); +return numGenericGroups(ClassicGroupState.STABLE); } }); registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge() { @Override public Long value() { -return numGenericGroups(GenericGroupState.DEAD); +return numGenericGroups(ClassicGroupState.DEAD); } }); registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge() { Review Comment: Are we keeping the metrics name with "generic" keyword though? I think we should if these metrics have already been released in previous versions, if not, might want to change the metric names to "classic" as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799416#comment-17799416 ] Divij Vaidya edited comment on KAFKA-15388 at 12/21/23 12:56 PM: - Hey [~enether] I will not consider this as a blocker for 3.7.0 since Tiered Storage is not production ready in 3.7.0. We will continue to carry this bug (as we did in 3.6) until it is fixed. If we are able to fix this in timeline for 3.7, well and good but if not, we will ship this in 3.8. This is a blocker for TS production ready status but that will be captured somewhere else. was (Author: divijvaidya): Hey [~enether] I will not consider this as a blocker for 3.7.0 since Tiered Storage is not GA in 3.7.0. We will continue to carry this bug (as we did in 3.6) until it is fixed. If we are able to fix this in timeline for 3.7, well and good but if not, we will ship this in 3.8. > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16040) Rename `Generic` to `Classic`
[ https://issues.apache.org/jira/browse/KAFKA-16040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-16040: Description: People has raised concerned about using {{Generic}} as a name to designate the old rebalance protocol. We considered using {{Legacy}} but discarded it because there are still applications, such as Connect, using the old protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance Protocol{}}}. > Rename `Generic` to `Classic` > - > > Key: KAFKA-16040 > URL: https://issues.apache.org/jira/browse/KAFKA-16040 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.7.0 > > > People has raised concerned about using {{Generic}} as a name to designate > the old rebalance protocol. We considered using {{Legacy}} but discarded it > because there are still applications, such as Connect, using the old > protocol. We settled on using {{Classic}} for the {{{}Classic Rebalance > Protocol{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15388: - Priority: Major (was: Blocker) > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Major > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799416#comment-17799416 ] Divij Vaidya commented on KAFKA-15388: -- Hey [~enether] I will not consider this as a blocker for 3.7.0 since Tiered Storage is not GA in 3.7.0. We will continue to carry this bug (as we did in 3.6) until it is fixed. If we are able to fix this in timeline for 3.7, well and good but if not, we will ship this in 3.8. > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Blocker > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14888: Added remote log segments retention functionality based on time and size. [kafka]
divijvaidya commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1434040959 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); Review Comment: Yes that is correct. Copying functionality is not impacted as discussed in https://issues.apache.org/jira/browse/KAFKA-15388. It's only the read-from-remote that is impacted for the historically compacted topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16040; Rename `Generic` to `Classic` [kafka]
dajac opened a new pull request, #15059: URL: https://github.com/apache/kafka/pull/15059 People has raised concerned about using `Generic` as a name to designate the old rebalance protocol. We considered using `Legacy` but discarded it because there are still applications, such as Connect, using the old protocol. We settled on using `Classic` for the `Classic Rebalance Protocol`. The changes in this patch are extremely mechanical. It basically replaces the occurrences of `generic` by `classic`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799399#comment-17799399 ] Arpit Goyal edited comment on KAFKA-15388 at 12/21/23 12:50 PM: [~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working on the fix. Hopefully will create a PR by EOD. was (Author: JIRAUSER301926): [~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working on the fix. Hopefully will create a PR by Saturday. > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Blocker > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13950) Resource leak at multiple places in the code
[ https://issues.apache.org/jira/browse/KAFKA-13950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-13950: - Fix Version/s: 3.8.0 (was: 3.7.0) > Resource leak at multiple places in the code > > > Key: KAFKA-13950 > URL: https://issues.apache.org/jira/browse/KAFKA-13950 > Project: Kafka > Issue Type: Bug > Components: clients, kraft, streams, unit tests >Reporter: Divij Vaidya >Assignee: Divij Vaidya >Priority: Major > Fix For: 3.8.0 > > > I ran Amazon CodeGuru reviewer on Apache Kafka's code base and the code tool > detected various places where Closable resources are not being closed > properly leading to leaks. > This task will fix the resource leak detected at multiple places. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16040) Rename `Generic` to `Classic`
David Jacot created KAFKA-16040: --- Summary: Rename `Generic` to `Classic` Key: KAFKA-16040 URL: https://issues.apache.org/jira/browse/KAFKA-16040 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot Fix For: 3.7.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13950: Fix resource leak in error scenarios [kafka]
divijvaidya merged PR #12228: URL: https://github.com/apache/kafka/pull/12228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16036) Add `group.coordinator.rebalance.protocols` and publish all new configs
[ https://issues.apache.org/jira/browse/KAFKA-16036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16036. - Resolution: Fixed > Add `group.coordinator.rebalance.protocols` and publish all new configs > --- > > Key: KAFKA-16036 > URL: https://issues.apache.org/jira/browse/KAFKA-16036 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16035: add tests for remoteLogSizeComputationTime/remoteFetchExpiresPerSec metrics [kafka]
showuon commented on PR #15056: URL: https://github.com/apache/kafka/pull/15056#issuecomment-1866185768 @satishd @kamalcph @clolov , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]
dajac merged PR #15053: URL: https://github.com/apache/kafka/pull/15053 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16036; Add `group.coordinator.rebalance.protocols` and publish all new configs [kafka]
dajac commented on PR #15053: URL: https://github.com/apache/kafka/pull/15053#issuecomment-1866183110 It is impossible to get a clean build at the moment. There are OOM errors all over the places, including in trunk. Looking at all the builds, it is clear that this PR does not create persistent failures. I also ran all the tests locally to confirm. Therefore, I will merge it to trunk and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13950: Fix resource leak in error scenarios [kafka]
divijvaidya commented on PR #12228: URL: https://github.com/apache/kafka/pull/12228#issuecomment-1866183063 Unrelated test failures since they are known to be flaky as per https://ge.apache.org/scans/tests?search.rootProjectNames=kafka=trunk=Europe%2FBerlin ``` [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationSSLTest/Build___JDK_17_and_Scala_2_13___testReplicateFromLatest__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest___2/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateFromLatest()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest/Build___JDK_11_and_Scala_2_13___testReplicateFromLatest__/) [Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextConsumerTest.testShrinkingTopicSubscriptions(String, String).quorum=kraft+kip848.groupProtocol=consumer](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.api/PlaintextConsumerTest/Build___JDK_11_and_Scala_2_13___testShrinkingTopicSubscriptions_String__String__quorum_kraft_kip848_groupProtocol_consumer/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_11_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft/) [Build / JDK 8 and Scala 2.12 / kafka.api.SaslSslConsumerTest.testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.api/SaslSslConsumerTest/Build___JDK_8_and_Scala_2_12___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/) [Build / JDK 8 and Scala 2.12 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_8_and_Scala_2_12___testListenerConnectionRateLimitWhenActualRateAboveLimit__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12228/12/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldHaveSamePositionBoundActiveAndStandBy/) [Build / JDK 8 and Scala 2.12 /
[jira] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388 ] Arpit Goyal deleted comment on KAFKA-15388: - was (Author: JIRAUSER301926): [~showuon] [~satish.duggana] [~christo_lolov] Any suggestion how to fetch the higher segment for a particular offset from remote , the same way we do in local storage ? > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Blocker > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15241) Compute tiered offset by keeping the respective epochs in scope.
[ https://issues.apache.org/jira/browse/KAFKA-15241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15241: --- Fix Version/s: 3.7.0 > Compute tiered offset by keeping the respective epochs in scope. > > > Key: KAFKA-15241 > URL: https://issues.apache.org/jira/browse/KAFKA-15241 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 3.6.0 >Reporter: Satish Duggana >Assignee: Kamal Chandraprakash >Priority: Major > Fix For: 3.7.0 > > > This is a followup on the discussion > [thread|https://github.com/apache/kafka/pull/14004#discussion_r1268911909] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799399#comment-17799399 ] Arpit Goyal commented on KAFKA-15388: - [~enether] [~satish.duggana] [~divijvaidya] [~enether] I am currently working on the fix. Hopefully will create a PR by Saturday. > Handle topics that were having compaction as retention earlier are changed to > delete only retention policy and onboarded to tiered storage. > > > Key: KAFKA-15388 > URL: https://issues.apache.org/jira/browse/KAFKA-15388 > Project: Kafka > Issue Type: Bug >Reporter: Satish Duggana >Assignee: Arpit Goyal >Priority: Blocker > Fix For: 3.7.0 > > Attachments: Screenshot 2023-11-15 at 3.47.54 PM.png, > tieredtopicloglist.png > > > Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517] > > There are 3 paths I looked at: > * When data is moved to remote storage (1) > * When data is read from remote storage (2) > * When data is deleted from remote storage (3) > (1) Does not have a problem with compacted topics. Compacted segments are > uploaded and their metadata claims they contain offset from the baseOffset of > the segment until the next segment's baseOffset. There are no gaps in offsets. > (2) Does not have a problem if a customer is querying offsets which do not > exist within a segment, but there are offset after the queried offset within > the same segment. *However, it does have a problem when the next available > offset is in a subsequent segment.* > (3) For data deleted via DeleteRecords there is no problem. For data deleted > via retention there is no problem. > > *I believe the proper solution to (2) is to make tiered storage continue > looking for the next greater offset in subsequent segments.* > Steps to reproduce the issue: > {code:java} > // TODO (christo) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] test [kafka]
dajac opened a new pull request, #15058: URL: https://github.com/apache/kafka/pull/15058 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring
[ https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-16017: -- Fix Version/s: 3.7.0 > Checkpointed offset is incorrect when task is revived and restoring > > > Key: KAFKA-16017 > URL: https://issues.apache.org/jira/browse/KAFKA-16017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.1 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.7.0 > > > Streams checkpoints the wrong offset when a task is revived after a > {{TaskCorruptedException}} and the task is then migrated to another stream > thread during restoration. > This might happen in a situation like the following if the Streams > application runs under EOS: > 1. Streams encounters a Network error which triggers a > {{TaskCorruptedException}} > 2. The task that encountered the exception is closed dirty and revived. The > state store directory is wiped out and a rebalance is triggered. > 3. Until the sync of the rebalance is received the revived task is restoring. > 4. When the sync is received the revived task is revoked and a new rebalance > is triggered. During the revocation the task is closed cleanly and a > checkpoint file is written. > 5. With the next rebalance the task moves back to stream thread from which it > was revoked, read the checkpoint and starts restoring. (I might be enough if > the task moves to a stream thread on the same Streams client that shares the > same state directory). > 6. The state of the task misses some records > To mitigate the issue one can restart the the stream thread and delete of the > state on disk. After that the state restores completely from the changelog > topic and the state does not miss any records anymore. > The root cause is that the checkpoint that is written in step 4 contains the > offset that the record collector stored when it sent the records to the > changelog topic. However, since in step 2 the state directory is wiped out, > the state does not contain those records anymore. It only contains the > records that it restored in step 3 which might be less. The root cause of > this is that the offsets in the record collector are not cleaned up when the > record collector is closed. > I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017. > The repro can be started with > {code} > ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x > spotbugsTest --tests RestoreIntegrationTest.test --info > test.log > {code} > The repro writes records into a state store and tries to retrieve them again > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582). > It will throw an {{IllegalStateException}} if it cannot find a record in the > state > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594). > Once the offsets in the record collector are cleared on close > (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332 > and > https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349), > the {{IllegalStateException}} does not occur anymore. > In the logs you can check for > - {{Restore batch end offset is}} which are the restored offsets in the state. > - {{task [0_1] Writing checkpoint:}} which are the written checkpoints. > - {{task [0_1] Checkpointable offsets}} which show the offsets coming from > the sending records to the changelog topic > {{RestoreIntegrationTesttest-stateStore-changelog-1}} > Always the last instances of these before the {{IllegalStateException}} is > thrown. > You will see that the restored offsets are less than the offsets that are > written to the checkpoint. The offsets written to the checkpoint come from > the offsets stored when sending the records to the changelog topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15158) Add metrics for RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec
[ https://issues.apache.org/jira/browse/KAFKA-15158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-15158: --- Labels: tiered-storage (was: ) > Add metrics for RemoteDeleteRequestsPerSec, RemoteDeleteErrorsPerSec, > BuildRemoteLogAuxStateRequestsPerSec, BuildRemoteLogAuxStateErrorsPerSec > -- > > Key: KAFKA-15158 > URL: https://issues.apache.org/jira/browse/KAFKA-15158 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Gantigmaa Selenge >Priority: Major > Labels: tiered-storage > Fix For: 3.7.0 > > > Add the following metrics for better observability into the RemoteLog related > activities inside the broker. > 1. RemoteWriteRequestsPerSec > 2. RemoteDeleteRequestsPerSec > 3. BuildRemoteLogAuxStateRequestsPerSec > > These metrics will be calculated at topic level (we can add them at > brokerTopicStats) > -*RemoteWriteRequestsPerSec* will be marked on every call to > RemoteLogManager#- > -copyLogSegmentsToRemote()- already covered by KAFKA-14953 > > *RemoteDeleteRequestsPerSec* will be marked on every call to > RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced > in [https://github.com/apache/kafka/pull/13561] > *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to > ReplicaFetcherTierStateMachine#buildRemoteLogAuxState() > > (Note: For all the above, add Error metrics as well such as > RemoteDeleteErrorPerSec) > (Note: This requires a change in KIP-405 and hence, must be approved by KIP > author [~satishd] ) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14888: Added remote log segments retention functionality based on time and size. [kafka]
iit2009060 commented on code in PR #13561: URL: https://github.com/apache/kafka/pull/13561#discussion_r1433944692 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -761,11 +784,385 @@ public void run() { } } +public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) { +if (isLeader()) { +logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset); +updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset); +} +} + +class RemoteLogRetentionHandler { + +private final Optional retentionSizeData; +private final Optional retentionTimeData; + +private long remainingBreachedSize; + +private OptionalLong logStartOffset = OptionalLong.empty(); + +public RemoteLogRetentionHandler(Optional retentionSizeData, Optional retentionTimeData) { +this.retentionSizeData = retentionSizeData; +this.retentionTimeData = retentionTimeData; +remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L); +} + +private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException { +if (!retentionSizeData.isPresent()) { +return false; +} + +boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> { +// Assumption that segments contain size >= 0 +if (remainingBreachedSize > 0) { +long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes(); +if (remainingBytes >= 0) { +remainingBreachedSize = remainingBytes; +return true; +} +} + +return false; +}); +if (isSegmentDeleted) { +logStartOffset = OptionalLong.of(metadata.endOffset() + 1); Review Comment: @divijvaidya @satishd @showuon I gone through the specific code and realised this is actually not impacting the logic 1. While copying the remote segments , remotelogsegmentmetadata stores endoffset using value from the nextSegment base offset. https://github.com/apache/kafka/blob/5785796f985aa294c12e670da221d086a7fa887c/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L693 2. In my understanding it will be safe to use same logic for historically compacted topics. Let me know If my analysis is correct or not ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433900022 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ## @@ -266,6 +267,28 @@ CompletableFuture deleteOffsets( BufferSupplier bufferSupplier ); +/** + * Complete a transaction. This is called when the WriteTxnMarkers API is called Review Comment: That makes sense. I also added a check in `GroupCoordinatorService.completeTransaction` to fail if another partition is received as it is not expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433892655 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2432,7 +2477,13 @@ class KafkaApis(val requestChannel: RequestChannel, origin = AppendOrigin.COORDINATOR, entriesPerPartition = controlRecords, requestLocal = requestLocal, - responseCallback = maybeSendResponseCallback(producerId, marker.transactionResult)) + responseCallback = errors => { Review Comment: Yes. There are existing unit tests and integrations tests testing this path. Note that the tests that I've added also exercise this part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433891089 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -3018,6 +3020,223 @@ class KafkaApisTest { any()) } + @Test Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433890364 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -102,6 +241,31 @@ public long append( } } +@Override +public long appendEndTransactionMarker( +TopicPartition tp, +long producerId, +short producerEpoch, +int coordinatorEpoch, +TransactionResult result +) throws KafkaException { +PartitionState state = partitionState(tp); +state.lock.lock(); +try { +state.entries.add(new LogControl( +producerId, +producerEpoch, +coordinatorEpoch, +result +)); +state.endOffset += 1; +if (autoCommit) commit(tp, state.endOffset); Review Comment: It means that we advance the HWM immediately and trigger the `onHighWatermarkUpdated` callback if enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433889262 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java: ## @@ -33,10 +37,141 @@ */ public class InMemoryPartitionWriter implements PartitionWriter { +public static class LogEntry { +public static LogEntry value(T value) { +return new LogValue<>(value); +} + +public static LogEntry value( +long producerId, +short producerEpoch, +T value +) { +return new LogValue<>( +producerId, +producerEpoch, +value +); +} + +public static LogEntry control( +long producerId, +short producerEpoch, +int coordinatorEpoch, +TransactionResult result +) { +return new LogControl( +producerId, +producerEpoch, +coordinatorEpoch, +result +); +} +} + +public static class LogValue extends LogEntry { +public final long producerId; +public final short producerEpoch; +public final T value; + +private LogValue( +long producerId, +short producerEpoch, +T value +) { +this.producerId = producerId; +this.producerEpoch = producerEpoch; +this.value = value; +} + +private LogValue(T value) { +this( +RecordBatch.NO_PRODUCER_ID, +RecordBatch.NO_PRODUCER_EPOCH, +value +); +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433888169 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + +// Commit completion #1. +
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433886514 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + +// Commit completion #1. +
Re: [PR] KAFKA-14505; [3/N] Wire WriteTxnMarkers API [kafka]
dajac commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433877427 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1073,6 +1151,314 @@ public CoordinatorShardBuilder get() { ); } +@ParameterizedTest +@EnumSource(value = TransactionResult.class) +public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = new MockPartitionWriter(); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) +.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) +.build(); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 10); + +// Verify the initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0L, ctx.coordinator.lastWrittenOffset()); +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + +// Transactional write #1. +CompletableFuture write1 = runtime.scheduleTransactionalWriteOperation( +"write#1", +TP, +"transactional-id", +100L, +(short) 5, +DEFAULT_WRITE_TIMEOUT, +state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") +); + +// Verify that the write is not committed yet. +assertFalse(write1.isDone()); + +// The last written offset is updated. +assertEquals(2L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. They are stored in +// the pending set for now. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().pendingRecords( +100L +)); +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2") +), writer.entries(TP)); + +// Complete transaction #1. +CompletableFuture complete1 = runtime.scheduleTransactionCompletion( +"complete#1", +TP, +100L, +(short) 5, +10, +result, +DEFAULT_WRITE_TIMEOUT +); + +// Verify that the completion is not committed yet. +assertFalse(complete1.isDone()); + +// The last written offset is updated. +assertEquals(3L, ctx.coordinator.lastWrittenOffset()); +// The last committed offset does not change. +assertEquals(0L, ctx.coordinator.lastCommittedOffset()); +// A new snapshot is created. +assertEquals(Arrays.asList(0L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); +// Records have been replayed to the coordinator. +if (result == TransactionResult.COMMIT) { +// They are now in the records set if committed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.coordinator().records()); +} else { +// Or they are gone if aborted. +assertEquals(Collections.emptySet(), ctx.coordinator.coordinator().records()); +} + +// Records have been written to the log. +assertEquals(Arrays.asList( +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record1"), +InMemoryPartitionWriter.LogEntry.value(100L, (short) 5, "record2"), +InMemoryPartitionWriter.LogEntry.control(100L, (short) 5, 10, result) +), writer.entries(TP)); + +// Commit write #1. +writer.commit(TP, 2); + +// The write is completed. +assertTrue(write1.isDone()); +assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); Review Comment: Yes, that's correct. The move from the