Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac merged PR #15155: URL: https://github.com/apache/kafka/pull/15155 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449270020 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: That’s fair. Let me add it tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449268767 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: I would think it is just codifying the state of the protocol and will flag us if it changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449266480 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: I could add it but it does not bring much value in my opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1449238660 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: Oh interesting. Is it worth testing that we don't return it for now and update it if we plan to change the behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on PR #15155: URL: https://github.com/apache/kafka/pull/15155#issuecomment-188752 @jolshan Thanks for your comments. I have addressed all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448472421 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: very good question. i wondered the same. when all offsets are requested, the list of partitions is built from the main offset storage `offsets`. so if a partition has a pending transaction but has no previous offset, we don't return it at all. i am not sure if it was really done on purpose or not but it is how the current scala implementation works. i considered also returning the pending partitions but i eventually decided to follow the current implementation for now. we could revise this later on if we want to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140837 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); + +// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the UNSTABLE_OFFSET_COMMIT +// errors for foo-1 and bar-0. Review Comment: ditto with the comments for expectations for foo-0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448140646 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1963,6 +2080,75 @@ public void testFetchAllOffsetsAtDifferentCommittedOffset() { ), context.fetchAllOffsets("group", Long.MAX_VALUE)); } +@Test +public void testFetchAllOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); Review Comment: should we have bar-1 again to test that case too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448139696 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ## @@ -1856,6 +1888,91 @@ public void testFetchOffsetsAtDifferentCommittedOffset() { ), context.fetchOffsets("group", request, Long.MAX_VALUE)); } +@Test +public void testFetchOffsetsWithPendingTransactionalOffsets() { +OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + +context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true); + +context.commitOffset("group", "foo", 0, 100L, 1); +context.commitOffset("group", "foo", 1, 110L, 1); +context.commitOffset("group", "bar", 0, 200L, 1); + +context.commit(); + +assertEquals(3, context.lastWrittenOffset); +assertEquals(3, context.lastCommittedOffset); + +context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds()); +context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds()); +// Note that bar-1 does not exist in the initial commits. UNSTABLE_OFFSET_COMMIT errors +// must be returned in this case too. +context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds()); + +// Always use the same request. +List request = Arrays.asList( +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("foo") +.setPartitionIndexes(Arrays.asList(0, 1)), +new OffsetFetchRequestData.OffsetFetchRequestTopics() +.setName("bar") +.setPartitionIndexes(Arrays.asList(0, 1)) +); + +// Fetching offsets with "require stable" (Long.MAX_VALUE) should return the UNSTABLE_OFFSET_COMMIT +// errors for foo-1, bar-0 and bar-1. Review Comment: should we mention foo-0 will be returned in this comment and also the comment below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1448132968 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Ah its the snapshot registry and revertLastWrittenOffset method that does this. I had forgotten. But makes sense now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447913870 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: yeah, it is implicit. the write operation (the transactional offset commit) could for instance fail. the state is rolled back in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447907424 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Sorry -- I left a lot of words here. Is there a time where we take advantage of the data structure? Ie using the epoch/rolling back? I didn't see any in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on PR #15155: URL: https://github.com/apache/kafka/pull/15155#issuecomment-1884642430 @jolshan Thanks for your comments. I have addressed all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447230142 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws ApiException { +final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE; Review Comment: Correct. When the require stable flag is set in the offset fetch request, the read operation reads with `MAX_VALUE` in order to read the latest state. If the flag is not set, the read op reads with the last committed offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1447226199 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Your understanding is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446778536 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: Also these are timeline data structures but are there cases when we roll it back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446774595 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -668,6 +697,8 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( OffsetFetchRequestData.OffsetFetchRequestGroup request, long lastCommittedOffset ) throws ApiException { +final boolean requireStable = lastCommittedOffset == Long.MAX_VALUE; Review Comment: is it the case that when we query stable offsets we always have the long max value for lastCommittedOffset? Or in other words, we can't query a specific offset when we require stable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446773736 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: just for my understanding though -- for every producer ID for a group we return, we will have an offset in pendingTransactionalOffsets, and every producer ID in pendingTransactionalOffsets will have its group in openTransactionsByGroup. I can't imagine we would have a case where something is in one map but not the other. (I can imagine that the group is not there at all or the topic partition is not there though) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -656,6 +663,28 @@ public int deleteAllOffsets( return numDeletedOffsets.get(); } +/** + * @return true iif there is at least one pending transactional offsets for the given Review Comment: nit: iff, offset (unless iif is a different acronym and is not if and only if) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
jolshan commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1446770811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -656,6 +663,28 @@ public int deleteAllOffsets( return numDeletedOffsets.get(); } +/** + * @return true iif there is at least one pending transactional offsets for the given Review Comment: nit: offset -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14505; [5/N] Add `UNSTABLE_OFFSET_COMMIT` error support [kafka]
dajac commented on code in PR #15155: URL: https://github.com/apache/kafka/pull/15155#discussion_r1445797517 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -195,6 +196,11 @@ public OffsetMetadataManager build() { */ private final TimelineHashMap pendingTransactionalOffsets; +/** + * The open transactions (producer ids) keyed by group. + */ +private final TimelineHashMap> openTransactionsByGroup; Review Comment: I follow an approach similar to the one used in the scala implementation. We basically store the open producer ids per group and use them to search the corresponding partitions in `pendingTransactionalOffsets`. An alternative way would be to directly store the open transactions by group, topic and partition but this would use more space. Classic runtime vs space tradeoff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org