Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-12 Thread via GitHub


dajac merged PR #15155:
URL: https://github.com/apache/kafka/pull/15155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449270020


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   That’s fair. Let me add it tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449268767


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   I would think it is just codifying the state of the protocol and will flag 
us if it changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449266480


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   I could add it but it does not bring much value in my opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1449238660


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   Oh interesting. Is it worth testing that we don't return it for now and 
update it if we plan to change the behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on PR #15155:
URL: https://github.com/apache/kafka/pull/15155#issuecomment-188752

   @jolshan Thanks for your comments. I have addressed all of them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-11 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448472421


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   very good question. i wondered the same. when all offsets are requested, the 
list of partitions is built from the main offset storage `offsets`. so if a 
partition has a pending transaction but has no previous offset, we don't return 
it at all. i am not sure if it was really done on purpose or not but it is how 
the current scala implementation works. i considered also returning the pending 
partitions but i eventually decided to follow the current implementation for 
now. we could revise this later on if we want to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140837


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());
+
+// Fetching offsets with "require stable" (Long.MAX_VALUE) should 
return the UNSTABLE_OFFSET_COMMIT
+// errors for foo-1 and bar-0.

Review Comment:
   ditto with the comments for expectations for foo-0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140646


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1963,6 +2080,75 @@ public void 
testFetchAllOffsetsAtDifferentCommittedOffset() {
 ), context.fetchAllOffsets("group", Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());

Review Comment:
   should we have bar-1 again to test that case too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448139696


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -1856,6 +1888,91 @@ public void testFetchOffsetsAtDifferentCommittedOffset() 
{
 ), context.fetchOffsets("group", request, Long.MAX_VALUE));
 }
 
+@Test
+public void testFetchOffsetsWithPendingTransactionalOffsets() {
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", 
true);
+
+context.commitOffset("group", "foo", 0, 100L, 1);
+context.commitOffset("group", "foo", 1, 110L, 1);
+context.commitOffset("group", "bar", 0, 200L, 1);
+
+context.commit();
+
+assertEquals(3, context.lastWrittenOffset);
+assertEquals(3, context.lastCommittedOffset);
+
+context.commitOffset(10L, "group", "foo", 1, 111L, 1, 
context.time.milliseconds());
+context.commitOffset(10L, "group", "bar", 0, 201L, 1, 
context.time.milliseconds());
+// Note that bar-1 does not exist in the initial commits. 
UNSTABLE_OFFSET_COMMIT errors
+// must be returned in this case too.
+context.commitOffset(10L, "group", "bar", 1, 211L, 1, 
context.time.milliseconds());
+
+// Always use the same request.
+List request = 
Arrays.asList(
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("foo")
+.setPartitionIndexes(Arrays.asList(0, 1)),
+new OffsetFetchRequestData.OffsetFetchRequestTopics()
+.setName("bar")
+.setPartitionIndexes(Arrays.asList(0, 1))
+);
+
+// Fetching offsets with "require stable" (Long.MAX_VALUE) should 
return the UNSTABLE_OFFSET_COMMIT
+// errors for foo-1, bar-0 and bar-1.

Review Comment:
   should we mention foo-0 will be returned in this comment and also the 
comment below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1448132968


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Ah its the snapshot registry and revertLastWrittenOffset method that does 
this. I had forgotten. But makes sense now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447913870


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   yeah, it is implicit. the write operation (the transactional offset commit) 
could for instance fail. the state is rolled back in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447907424


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Sorry -- I left a lot of words here. Is there a time where we take advantage 
of the data structure? Ie using the epoch/rolling back? I didn't see any in 
this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


dajac commented on PR #15155:
URL: https://github.com/apache/kafka/pull/15155#issuecomment-1884642430

   @jolshan Thanks for your comments. I have addressed all of them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447230142


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchOffsets(
 OffsetFetchRequestData.OffsetFetchRequestGroup request,
 long lastCommittedOffset
 ) throws ApiException {
+final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE;

Review Comment:
   Correct. When the require stable flag is set in the offset fetch request, 
the read operation reads with `MAX_VALUE` in order to read the latest state. If 
the flag is not set, the read op reads with the last committed offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-10 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1447226199


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Your understanding is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446778536


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   Also these are timeline data structures but are there cases when we roll it 
back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446774595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchOffsets(
 OffsetFetchRequestData.OffsetFetchRequestGroup request,
 long lastCommittedOffset
 ) throws ApiException {
+final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE;

Review Comment:
   is it the case that when we query stable offsets we always have the long max 
value for lastCommittedOffset? Or in other words, we can't query a specific 
offset when we require stable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446773736


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   just for my understanding though -- for every producer ID for a group we 
return, we will have an offset in pendingTransactionalOffsets, and every 
producer ID in pendingTransactionalOffsets will have its group in 
openTransactionsByGroup.
   
   I can't imagine we would have a case where something is in one map but not 
the other. (I can imagine that the group is not there at all or the topic 
partition is not there though)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -656,6 +663,28 @@ public int deleteAllOffsets(
 return numDeletedOffsets.get();
 }
 
+/**
+ * @return true iif there is at least one pending transactional offsets 
for the given

Review Comment:
   nit: iff, offset
   (unless iif is a different acronym and is not if and only if)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


jolshan commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -656,6 +663,28 @@ public int deleteAllOffsets(
 return numDeletedOffsets.get();
 }
 
+/**
+ * @return true iif there is at least one pending transactional offsets 
for the given

Review Comment:
   nit: offset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]

2024-01-09 Thread via GitHub


dajac commented on code in PR #15155:
URL: https://github.com/apache/kafka/pull/15155#discussion_r1445797517


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -195,6 +196,11 @@ public OffsetMetadataManager build() {
  */
 private final TimelineHashMap pendingTransactionalOffsets;
 
+/**
+ * The open transactions (producer ids) keyed by group.
+ */
+private final TimelineHashMap> 
openTransactionsByGroup;

Review Comment:
   I follow an approach similar to the one used in the scala implementation. We 
basically store the open producer ids per group and use them to search the 
corresponding partitions in `pendingTransactionalOffsets`. An alternative way 
would be to directly store the open transactions by group, topic and partition 
but this would use more space. Classic runtime vs space tradeoff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org