Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac merged PR #14845: URL: https://github.com/apache/kafka/pull/14845 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on PR #14845: URL: https://github.com/apache/kafka/pull/14845#issuecomment-1845119315 The last build is pretty awful. I went through the list of all failed tests and I could not link any of them to the changes made in this PR. They are all unrelated. Therefore, I will merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on PR #14845: URL: https://github.com/apache/kafka/pull/14845#issuecomment-1843565757 It looks like the new failing tests are coming from https://github.com/apache/kafka/pull/14626. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1417658955 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -653,12 +653,15 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberId The member id. * @param groupInstanceId The group instance id. * @param memberEpoch The member epoch. + * @param isTransactional Whether the offset commit is transactional or not. It has no + * impact when a consumer group is used. */ @Override public void validateOffsetCommit( String memberId, String groupInstanceId, -int memberEpoch +int memberEpoch, +boolean isTransactional Review Comment: I thought that we don't need it for the "new consumer" groups. My reasoning is that they should not use old versions of the TxnOffsetCommit API if they use the new protocol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1416220626 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -653,12 +653,15 @@ public DeadlineAndEpoch metadataRefreshDeadline() { * @param memberId The member id. * @param groupInstanceId The group instance id. * @param memberEpoch The member epoch. + * @param isTransactional Whether the offset commit is transactional or not. It has no + * impact when a consumer group is used. */ @Override public void validateOffsetCommit( String memberId, String groupInstanceId, -int memberEpoch +int memberEpoch, +boolean isTransactional Review Comment: Will we add the checks for transactional commits in the next PR? Or do you think we don't need them for new ConsumerGroup? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1416142507 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -187,8 +190,6 @@ public CoordinatorResult commitOffset( short version, OffsetCommitRequestData request ) { -snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); Review Comment: Ok -- so just a bug we didn't see before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on PR #14845: URL: https://github.com/apache/kafka/pull/14845#issuecomment-1840467329 @jolshan Thanks for your comments. I have addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415313002 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -187,8 +190,6 @@ public CoordinatorResult commitOffset( short version, OffsetCommitRequestData request ) { -snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); Review Comment: The snapshot is already created in `replay` so we don't need it here. Moreover, creating one with `lastCommittedOffset` is incorrect here. It should be created based on the `lastWrittenOffset`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415313002 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -187,8 +190,6 @@ public CoordinatorResult commitOffset( short version, OffsetCommitRequestData request ) { -snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); Review Comment: The snapshot is already created in `replay` so we don't need it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415310830 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -675,25 +675,25 @@ public void testValidateOffsetCommit() { // Simulate a call from the admin client without member id and member epoch. // This should pass only if the group is empty. -group.validateOffsetCommit("", "", -1); +group.validateOffsetCommit("", "", -1, false); Review Comment: `isTransactional` has actually no impact on the validation but I agree that we should check to ensure this. I have parameterized it to check both. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415305714 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -293,7 +326,11 @@ public void testReplayOffsetCommit() { new ApiMessageAndVersion(value, (short) 0) )); -verify(offsetMetadataManager, times(2)).replay(key, value); +verify(offsetMetadataManager, times(2)).replay( +RecordBatch.NO_PRODUCER_ID, Review Comment: Sounds good. Added testReplayTransactionalOffsetCommit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415299461 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws ExecutionException, Interrup future.get() ); } + +@Test +public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime, +new GroupCoordinatorMetrics() +); + +TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData() +.setGroupId("foo") +.setTransactionalId("transactional-id") +.setMemberId("member-id") +.setGenerationId(10) +.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() +.setPartitionIndex(0) +.setCommittedOffset(100); + +CompletableFuture future = service.commitTransactionalOffsets( +requestContext(ApiKeys.TXN_OFFSET_COMMIT), +request, +BufferSupplier.NO_CACHING +); + +assertEquals( +new TxnOffsetCommitResponseData() +.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() +.setPartitionIndex(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), +future.get() +); +} + +@ParameterizedTest +@NullSource Review Comment: That's right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415298403 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -833,13 +835,15 @@ public void validateOffsetCommit( if (generationId != this.generationId) { throw Errors.ILLEGAL_GENERATION.exception(); } -} else if (!isInState(EMPTY)) { +} else if (!isTransactional && !isInState(EMPTY)) { // If the request does not contain the member id and the generation id (version 0), // offset commits are only accepted when the group is empty. +// This does not apply to transactional offset commits, since the older versions +// of this protocol do not require member id and generation id. throw Errors.UNKNOWN_MEMBER_ID.exception(); } -if (isInState(COMPLETING_REBALANCE)) { +if (!isTransactional && isInState(COMPLETING_REBALANCE)) { Review Comment: No. It is not applied but not for the same reason. For transactional offset commits, the group state is not checked but I have to admit that I don't know why. Let me update the comment anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1415287012 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: Let me rework those comments. > On abort we remove the producer id's entries, and on commit we add them to offsetsByGroup. Correct. > Sorry for so many comments here -- but is it correct we don't have the code to add/remove them on commit/abort yet? Correct. This will come in the next PR, stay tuned ;). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414703024 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: Sorry for so many comments here -- but is it correct we don't have the code to add/remove them on commit/abort yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414697270 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -187,8 +190,6 @@ public CoordinatorResult commitOffset( short version, OffsetCommitRequestData request ) { -snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); Review Comment: What's the context behind removing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694924 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java: ## @@ -675,25 +675,25 @@ public void testValidateOffsetCommit() { // Simulate a call from the admin client without member id and member epoch. // This should pass only if the group is empty. -group.validateOffsetCommit("", "", -1); +group.validateOffsetCommit("", "", -1, false); Review Comment: Do we want to have a test where this argument is true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694054 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -293,7 +326,11 @@ public void testReplayOffsetCommit() { new ApiMessageAndVersion(value, (short) 0) )); -verify(offsetMetadataManager, times(2)).replay(key, value); +verify(offsetMetadataManager, times(2)).replay( +RecordBatch.NO_PRODUCER_ID, Review Comment: should we verify something with a producer ID here? ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ## @@ -293,7 +326,11 @@ public void testReplayOffsetCommit() { new ApiMessageAndVersion(value, (short) 0) )); -verify(offsetMetadataManager, times(2)).replay(key, value); +verify(offsetMetadataManager, times(2)).replay( +RecordBatch.NO_PRODUCER_ID, Review Comment: should we verify something with a producer ID here in this file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690648 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java: ## @@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws ExecutionException, Interrup future.get() ); } + +@Test +public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException { +CoordinatorRuntime runtime = mockRuntime(); +GroupCoordinatorService service = new GroupCoordinatorService( +new LogContext(), +createConfig(), +runtime, +new GroupCoordinatorMetrics() +); + +TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData() +.setGroupId("foo") +.setTransactionalId("transactional-id") +.setMemberId("member-id") +.setGenerationId(10) +.setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() +.setPartitionIndex(0) +.setCommittedOffset(100); + +CompletableFuture future = service.commitTransactionalOffsets( +requestContext(ApiKeys.TXN_OFFSET_COMMIT), +request, +BufferSupplier.NO_CACHING +); + +assertEquals( +new TxnOffsetCommitResponseData() +.setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() +.setName("topic") +.setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() +.setPartitionIndex(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), +future.get() +); +} + +@ParameterizedTest +@NullSource Review Comment: Does this mean we test null string and empty string? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690195 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ## @@ -833,13 +835,15 @@ public void validateOffsetCommit( if (generationId != this.generationId) { throw Errors.ILLEGAL_GENERATION.exception(); } -} else if (!isInState(EMPTY)) { +} else if (!isTransactional && !isInState(EMPTY)) { // If the request does not contain the member id and the generation id (version 0), // offset commits are only accepted when the group is empty. +// This does not apply to transactional offset commits, since the older versions +// of this protocol do not require member id and generation id. throw Errors.UNKNOWN_MEMBER_ID.exception(); } -if (isInState(COMPLETING_REBALANCE)) { +if (!isTransactional && isInState(COMPLETING_REBALANCE)) { Review Comment: Does the comment above also apply to this case with respect to transactional offset commits? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414627867 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment also confused me because we also use the same nested 3 data structures to store the committed offsets it seems. So we have offsetsByGroup -> all committed offsets where we don't care about producer ID pendingTransactionalOffsets -> a mapping of producer id -> offsetsByGroup (for only that producer) for all uncommitted txn offsets On abort we remove the producer id's entries, and on commit we add them to offsetsByGroup. Is this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This + * structure holds all the transactional offsets that are part of ongoing transactions. + * When the transaction is committed, they are transferred to the offsetsByGroup; when + * the transaction is aborted, they are removed. + */ +private final TimelineHashMap pendingTransactionalOffsets; + +private class Offsets { +private final TimelineHashMap>> offsetsByGroup; + +private Offsets() { +this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); +} + +private OffsetAndMetadata get( +String groupId, +String topic, +int partition +) { +TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId); +if (topicOffsets == null) { +return null; +} else { +TimelineHashMap partitionOffsets = topicOffsets.get(topic); +if (partitionOffsets == null) { +return null; +} else { +return partitionOffsets.get(partition); +} +} +} + +private OffsetAndMetadata put( +String groupId, +String topic, +int partition, +OffsetAndMetadata offsetAndMetadata +) { +TimelineHashMap> topicOffsets = offsetsByGroup +.computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); +TimelineHashMap partitionOffsets = topicOffsets +.computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); +return partitionOffsets.put(partition, offsetAndMetadata); +} + +private OffsetAndMetadata remove( +String groupId, +String topic, +int partition +) { +TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId); +if (topicOffsets == null) +return null; + +TimelineHashMap partitionOffsets = topicOffsets.get(topic); +if (partitionOffsets == null) +return null; + +OffsetAndMetadata removedValue = partitionOffsets.remove(partition); + +if (partitionOffsets.isEmpty()) Review Comment: Was about to ask if we remove the nested maps, but now I see we do so here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414610034 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment about the keys is a little confusing. Looking at the code we have 4 maps that are nested within each other. Maybe the comment could mention that each of these is a separate map? (Or we could include a non java doc comment in offsets to make it clearer. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ -private final TimelineHashMap>> offsetsByGroup; +private final Offsets offsets; + +/** + * The offsets keyed by producer id, group id, topic name and partition id. This Review Comment: This comment about the keys is a little confusing. Looking at the code we have 4 maps that are nested within each other. Maybe the comment could mention that each of these is a separate map? (Or we could include a non java doc comment in offsets to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]
dajac opened a new pull request, #14845: URL: https://github.com/apache/kafka/pull/14845 This PR is built on top of https://github.com/apache/kafka/pull/14844. e81db1e94bb51cf33cee353944722d5b5516729f could already be reviewed. This patch implements the TxnOffsetCommit API. When a transactional offset commit is received, it is stored in the pending transactional offsets structure and waits there until the transaction is committed or aborted. Note that the handling of the transaction completion is not implemented in this patch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org