Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-07 Thread via GitHub


dajac merged PR #14845:
URL: https://github.com/apache/kafka/pull/14845


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-07 Thread via GitHub


dajac commented on PR #14845:
URL: https://github.com/apache/kafka/pull/14845#issuecomment-1845119315

   The last build is pretty awful. I went through the list of all failed tests 
and I could not link any of them to the changes made in this PR. They are all 
unrelated. Therefore, I will merge this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-06 Thread via GitHub


dajac commented on PR #14845:
URL: https://github.com/apache/kafka/pull/14845#issuecomment-1843565757

   It looks like the new failing tests are coming from 
https://github.com/apache/kafka/pull/14626.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-06 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1417658955


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -653,12 +653,15 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
  * @param memberId  The member id.
  * @param groupInstanceId   The group instance id.
  * @param memberEpoch   The member epoch.
+ * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
+ *  impact when a consumer group is used.
  */
 @Override
 public void validateOffsetCommit(
 String memberId,
 String groupInstanceId,
-int memberEpoch
+int memberEpoch,
+boolean isTransactional

Review Comment:
   I thought that we don't need it for the "new consumer" groups. My reasoning 
is that they should not use old versions of the TxnOffsetCommit API if they use 
the new protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1416220626


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -653,12 +653,15 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
  * @param memberId  The member id.
  * @param groupInstanceId   The group instance id.
  * @param memberEpoch   The member epoch.
+ * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
+ *  impact when a consumer group is used.
  */
 @Override
 public void validateOffsetCommit(
 String memberId,
 String groupInstanceId,
-int memberEpoch
+int memberEpoch,
+boolean isTransactional

Review Comment:
   Will we add the checks for transactional commits in the next PR? Or do you 
think we don't need them for new ConsumerGroup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1416142507


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -187,8 +190,6 @@ public CoordinatorResult 
commitOffset(
 short version,
 OffsetCommitRequestData request
 ) {
-snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);

Review Comment:
   Ok -- so just a bug we didn't see before. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on PR #14845:
URL: https://github.com/apache/kafka/pull/14845#issuecomment-1840467329

   @jolshan Thanks for your comments. I have addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415313002


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -187,8 +190,6 @@ public CoordinatorResult 
commitOffset(
 short version,
 OffsetCommitRequestData request
 ) {
-snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);

Review Comment:
   The snapshot is already created in `replay` so we don't need it here. 
Moreover, creating one with `lastCommittedOffset` is incorrect here. It should 
be created based on the `lastWrittenOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415313002


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -187,8 +190,6 @@ public CoordinatorResult 
commitOffset(
 short version,
 OffsetCommitRequestData request
 ) {
-snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);

Review Comment:
   The snapshot is already created in `replay` so we don't need it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415310830


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -675,25 +675,25 @@ public void testValidateOffsetCommit() {
 
 // Simulate a call from the admin client without member id and member 
epoch.
 // This should pass only if the group is empty.
-group.validateOffsetCommit("", "", -1);
+group.validateOffsetCommit("", "", -1, false);

Review Comment:
   `isTransactional` has actually no impact on the validation but I agree that 
we should check to ensure this. I have parameterized it to check both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415305714


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -293,7 +326,11 @@ public void testReplayOffsetCommit() {
 new ApiMessageAndVersion(value, (short) 0)
 ));
 
-verify(offsetMetadataManager, times(2)).replay(key, value);
+verify(offsetMetadataManager, times(2)).replay(
+RecordBatch.NO_PRODUCER_ID,

Review Comment:
   Sounds good. Added testReplayTransactionalOffsetCommit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415299461


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws 
ExecutionException, Interrup
 future.get()
 );
 }
+
+@Test
+public void testCommitTransactionalOffsetsWhenNotStarted() throws 
ExecutionException, InterruptedException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+
+TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+.setGroupId("foo")
+.setTransactionalId("transactional-id")
+.setMemberId("member-id")
+.setGenerationId(10)
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+.setPartitionIndex(0)
+.setCommittedOffset(100);
+
+CompletableFuture future = 
service.commitTransactionalOffsets(
+requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+request,
+BufferSupplier.NO_CACHING
+);
+
+assertEquals(
+new TxnOffsetCommitResponseData()
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+.setPartitionIndex(0)
+
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+future.get()
+);
+}
+
+@ParameterizedTest
+@NullSource

Review Comment:
   That's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415298403


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -833,13 +835,15 @@ public void validateOffsetCommit(
 if (generationId != this.generationId) {
 throw Errors.ILLEGAL_GENERATION.exception();
 }
-} else if (!isInState(EMPTY)) {
+} else if (!isTransactional && !isInState(EMPTY)) {
 // If the request does not contain the member id and the 
generation id (version 0),
 // offset commits are only accepted when the group is empty.
+// This does not apply to transactional offset commits, since the 
older versions
+// of this protocol do not require member id and generation id.
 throw Errors.UNKNOWN_MEMBER_ID.exception();
 }
 
-if (isInState(COMPLETING_REBALANCE)) {
+if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   No. It is not applied but not for the same reason. For transactional offset 
commits, the group state is not checked but I have to admit that I don't know 
why. Let me update the comment anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-05 Thread via GitHub


dajac commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1415287012


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   Let me rework those comments.
   
   > On abort we remove the producer id's entries, and on commit we add them to 
offsetsByGroup.
   
   Correct.
   
   > Sorry for so many comments here -- but is it correct we don't have the 
code to add/remove them on commit/abort yet?
   
   Correct. This will come in the next PR, stay tuned ;).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414703024


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   Sorry for so many comments here -- but is it correct we don't have the code 
to add/remove them on commit/abort yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414697270


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -187,8 +190,6 @@ public CoordinatorResult 
commitOffset(
 short version,
 OffsetCommitRequestData request
 ) {
-snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);

Review Comment:
   What's the context behind removing this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694924


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -675,25 +675,25 @@ public void testValidateOffsetCommit() {
 
 // Simulate a call from the admin client without member id and member 
epoch.
 // This should pass only if the group is empty.
-group.validateOffsetCommit("", "", -1);
+group.validateOffsetCommit("", "", -1, false);

Review Comment:
   Do we want to have a test where this argument is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414694054


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -293,7 +326,11 @@ public void testReplayOffsetCommit() {
 new ApiMessageAndVersion(value, (short) 0)
 ));
 
-verify(offsetMetadataManager, times(2)).replay(key, value);
+verify(offsetMetadataManager, times(2)).replay(
+RecordBatch.NO_PRODUCER_ID,

Review Comment:
   should we verify something with a producer ID here?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##
@@ -293,7 +326,11 @@ public void testReplayOffsetCommit() {
 new ApiMessageAndVersion(value, (short) 0)
 ));
 
-verify(offsetMetadataManager, times(2)).replay(key, value);
+verify(offsetMetadataManager, times(2)).replay(
+RecordBatch.NO_PRODUCER_ID,

Review Comment:
   should we verify something with a producer ID here in this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690648


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -1615,4 +1618,127 @@ public void testDeleteGroupsWhenNotStarted() throws 
ExecutionException, Interrup
 future.get()
 );
 }
+
+@Test
+public void testCommitTransactionalOffsetsWhenNotStarted() throws 
ExecutionException, InterruptedException {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime,
+new GroupCoordinatorMetrics()
+);
+
+TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+.setGroupId("foo")
+.setTransactionalId("transactional-id")
+.setMemberId("member-id")
+.setGenerationId(10)
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+.setPartitionIndex(0)
+.setCommittedOffset(100);
+
+CompletableFuture future = 
service.commitTransactionalOffsets(
+requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+request,
+BufferSupplier.NO_CACHING
+);
+
+assertEquals(
+new TxnOffsetCommitResponseData()
+.setTopics(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+.setName("topic")
+.setPartitions(Collections.singletonList(new 
TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+.setPartitionIndex(0)
+
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
+future.get()
+);
+}
+
+@ParameterizedTest
+@NullSource

Review Comment:
   Does this mean we test null string and empty string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414690195


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##
@@ -833,13 +835,15 @@ public void validateOffsetCommit(
 if (generationId != this.generationId) {
 throw Errors.ILLEGAL_GENERATION.exception();
 }
-} else if (!isInState(EMPTY)) {
+} else if (!isTransactional && !isInState(EMPTY)) {
 // If the request does not contain the member id and the 
generation id (version 0),
 // offset commits are only accepted when the group is empty.
+// This does not apply to transactional offset commits, since the 
older versions
+// of this protocol do not require member id and generation id.
 throw Errors.UNKNOWN_MEMBER_ID.exception();
 }
 
-if (isInState(COMPLETING_REBALANCE)) {
+if (!isTransactional && isInState(COMPLETING_REBALANCE)) {

Review Comment:
   Does the comment above also apply to this case with respect to transactional 
offset commits?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414627867


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment also confused me because we also use the same nested 3 data 
structures to store the committed offsets it seems. So we have 
   
   offsetsByGroup -> all committed offsets where we don't care about producer ID
   
   pendingTransactionalOffsets -> a mapping of producer id -> offsetsByGroup 
(for only that producer) for all uncommitted txn offsets
   
   On abort we remove the producer id's entries, and on commit we add them to 
offsetsByGroup. 
   
   Is this correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This
+ * structure holds all the transactional offsets that are part of ongoing 
transactions.
+ * When the transaction is committed, they are transferred to the 
offsetsByGroup; when
+ * the transaction is aborted, they are removed.
+ */
+private final TimelineHashMap pendingTransactionalOffsets;
+
+private class Offsets {
+private final TimelineHashMap>> offsetsByGroup;
+
+private Offsets() {
+this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+}
+
+private OffsetAndMetadata get(
+String groupId,
+String topic,
+int partition
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId);
+if (topicOffsets == null) {
+return null;
+} else {
+TimelineHashMap partitionOffsets = 
topicOffsets.get(topic);
+if (partitionOffsets == null) {
+return null;
+} else {
+return partitionOffsets.get(partition);
+}
+}
+}
+
+private OffsetAndMetadata put(
+String groupId,
+String topic,
+int partition,
+OffsetAndMetadata offsetAndMetadata
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup
+.computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+TimelineHashMap partitionOffsets = 
topicOffsets
+.computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+return partitionOffsets.put(partition, offsetAndMetadata);
+}
+
+private OffsetAndMetadata remove(
+String groupId,
+String topic,
+int partition
+) {
+TimelineHashMap> topicOffsets = offsetsByGroup.get(groupId);
+if (topicOffsets == null)
+return null;
+
+TimelineHashMap partitionOffsets = 
topicOffsets.get(topic);
+if (partitionOffsets == null)
+return null;
+
+OffsetAndMetadata removedValue = 
partitionOffsets.remove(partition);
+
+if (partitionOffsets.isEmpty())

Review Comment:
   Was about to ask if we remove the nested maps, but now I see we do so here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-12-04 Thread via GitHub


jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414610034


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment about the keys is a little confusing.
   
   Looking at the code we have 4 maps that are nested within each other.  
Maybe the comment could mention that each of these is a separate map? (Or we 
could include a non java doc comment in offsets to make it clearer.
   
   



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
 /**
  * The offsets keyed by group id, topic name and partition id.
  */
-private final TimelineHashMap>> offsetsByGroup;
+private final Offsets offsets;
+
+/**
+ * The offsets keyed by producer id, group id, topic name and partition 
id. This

Review Comment:
   This comment about the keys is a little confusing.
   
   Looking at the code we have 4 maps that are nested within each other.  
Maybe the comment could mention that each of these is a separate map? (Or we 
could include a non java doc comment in offsets to make it clearer.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14505; [2/N] Implement TxnOffsetCommit API [kafka]

2023-11-27 Thread via GitHub


dajac opened a new pull request, #14845:
URL: https://github.com/apache/kafka/pull/14845

   This PR is built on top of https://github.com/apache/kafka/pull/14844. 
e81db1e94bb51cf33cee353944722d5b5516729f could already be reviewed.
   
   This patch implements the TxnOffsetCommit API. When a transactional offset 
commit is received, it is stored in the pending transactional offsets structure 
and waits there until the transaction is committed or aborted. Note that the 
handling of the transaction completion is not implemented in this patch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org