Re: [PR] Fix incorrect Java equals comparison of Uuid by reference [kafka]
alok123t commented on code in PR #15707: URL: https://github.com/apache/kafka/pull/15707#discussion_r1563676834 ## core/src/main/scala/kafka/server/AlterPartitionManager.scala: ## @@ -270,7 +270,7 @@ class DefaultAlterPartitionManager( inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) => val topicId = items.head.topicIdPartition.topicId - canUseTopicIds &= topicId != Uuid.ZERO_UUID + canUseTopicIds &= !topicId.equals(Uuid.ZERO_UUID) Review Comment: right, we don't need to do this for Scala files, reverted change in https://github.com/apache/kafka/pull/15707/commits/e206a2cfdfa955b3ab004b5644e28e6fa23999b9 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16511: - Description: I have some topics there were not written since a few days (having 12h retention) where some data remains on tiered storage (in our case S3) and they are never deleted. Looking at the log history, it appears that we never even tried to delete these segments: When looking at one of the non-leaking segment, I get the following interesting messages: {code:java} "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), segment-end-offset: 2976819 and segment-epochs: [5]" "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data for completed successfully RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ} , startOffset=2968418, endOffset=2976819, brokerId=10029, maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ} , startOffset=2968418, endOffset=2976819, brokerId=10029, maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 02968418.log to remote storage with segment-id: RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ}" "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data completed successfully, metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ} , startOffset=2968418, endOffset=2976819, brokerId=10029, maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, id=fqGng3UURCG3-v4lETeLKQ} , startOffset=2968418, endOffset=2976819, brokerId=10029, maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" {code} Which looks right because we can see logs from both the plugin and remote log manager indicating that the remote log segment was removed. Now if I look on one of the leaked segment, here is what I see {code:java} "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 02971163.log to remote storage with segment-id: RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, id=8dP13VDYSaiFlubl9SNBTQ}" "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data completed successfully, metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, id=8dP13VDYSaiFlubl9SNBTQ} , startOffset=2971163, endOffset=2978396, brokerId=10001, maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" "2024-04-02T00:43:20.003Z","""kafka""","""10001""","Copying log segment data, metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, id=8dP13VDYSaiFlubl9SNBTQ} , startOffset=2971163, endOffset=2978396, brokerId=10001, maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" {code} I have no errors whatsoever indicating that the remote log deletion was actually triggered and failed. I tried rolling restarting my cluster to see if refreshing
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836762#comment-17836762 ] Kamal Chandraprakash commented on KAFKA-16511: -- [~fvisconte] The issue might be due to the overlapping remote log segments after a new leader gets elected during rolling restart. Would you please upload the past 10 segments remote-log-segment metadata events for 5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765 partition? Thanks! > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remote log segment was removed. > Now if I look on one of the leaked segment, here is what I see > > {code:java} > "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied > 02971163.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ}" > "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ} > , startOffset=2971163, endOffset=2978396, brokerId=1
Re: [PR] debug for #15489 [kafka]
chia7712 closed pull request #15654: debug for #15489 URL: https://github.com/apache/kafka/pull/15654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] debug for #15489 [kafka]
chia7712 commented on PR #15654: URL: https://github.com/apache/kafka/pull/15654#issuecomment-2053016969 close as we do see the root cause -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1562683822 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " ++ outputSupplier.get() + ". Final offsets: " + getFinalOffsets() +); +} catch (InterruptedException | ExecutionException e) { +throw new RuntimeException(e); +} +} + +private Set getFinalOffsets() throws ExecutionException, InterruptedException { Review Comment: we can remove this function now. ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -94,15 +109,38 @@ private void setUp() { } } +private void createConsumerAndPoll() { +Properties props = new Properties(); +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); +props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); +props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +try (KafkaConsumer consumer = new KafkaConsumer<>(props)) { +List topics = new ArrayList<>(); +for (int i = 0; i < topicCount + 1; i++) { +topics.add(getTopicName(i)); +} +consumer.subscribe(topics); +consumer.poll(consumerTimeout); +} +} + static class Row { private String name; private int partition; -private Long timestamp; +private Long offset; Review Comment: Could you add `final` to those fields? ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +428,30 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( Review Comment: we don't need this retry now ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,13 +62,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; private final String topicName = "topic"; +private final Duration consumerTimeout = Duration.ofMillis(1000); Review Comment: this can be a local variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix incorrect equals comparison of Uuid by reference [kafka]
chia7712 commented on code in PR #15707: URL: https://github.com/apache/kafka/pull/15707#discussion_r1563600417 ## core/src/main/scala/kafka/server/AlterPartitionManager.scala: ## @@ -270,7 +270,7 @@ class DefaultAlterPartitionManager( inflightAlterPartitionItems.groupBy(_.topicIdPartition.topic).foreach { case (topicName, items) => val topicId = items.head.topicIdPartition.topicId - canUseTopicIds &= topicId != Uuid.ZERO_UUID + canUseTopicIds &= !topicId.equals(Uuid.ZERO_UUID) Review Comment: IIRC, it calls the equals method on the first object in Scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750 ] Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:06 PM: -- The KIP mentions nothing about static membership. I would also add that the current behavior seems to solve the wrong use case. A static member with persistent state is more likely to want to keep membership alive, while a member with transient/non-persistent state would want to relinquish membership on shutdown. I would consider this a bug as well. As evidence, my latest workaround involves using a random UUID as the "group.instance.id", hardly seems like static membership. I can certainly pick this up, but no promises on the expediency ;) was (Author: JIRAUSER305028): The KIP mentions nothing about static membership. I would also add that the current behavior seems to solve the wrong use case. A static member with persistent state is more likely to want to keep membership alive, while a member with transient/non-persistent state would want to relinquish membership on shutdown. I would consider this a bug as well. As evidence, my latest workaround involves using a random UUID as the "group.instance.id", hardly seems static. I can certainly pick this up, but no promises on the expediency ;) > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Fix incorrect equals comparison of Uuid by reference [kafka]
alok123t commented on PR #15707: URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052621773 i did smth similar to https://stackoverflow.com/a/73535824 to find all usages in the codebase -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750 ] Sal Sorrentino edited comment on KAFKA-16514 at 4/12/24 10:03 PM: -- The KIP mentions nothing about static membership. I would also add that the current behavior seems to solve the wrong use case. A static member with persistent state is more likely to want to keep membership alive, while a member with transient/non-persistent state would want to relinquish membership on shutdown. I would consider this a bug as well. As evidence, my latest workaround involves using a random UUID as the "group.instance.id", hardly seems static. I can certainly pick this up, but no promises on the expediency ;) was (Author: JIRAUSER305028): The KIP mentions nothing about static membership. I would also add that the current behavior seems to solve the wrong use case. A static member with persistent state is more likely to want to keep membership alive, while a member with transient/non-persistent state would want to relinquish membership on shutdown. I would consider this a bug as well. I can certainly pick this up, but no promises on the expediency ;) > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836750#comment-17836750 ] Sal Sorrentino commented on KAFKA-16514: The KIP mentions nothing about static membership. I would also add that the current behavior seems to solve the wrong use case. A static member with persistent state is more likely to want to keep membership alive, while a member with transient/non-persistent state would want to relinquish membership on shutdown. I would consider this a bug as well. I can certainly pick this up, but no promises on the expediency ;) > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]
alok123t commented on PR #15707: URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052600875 @jolshan @chia7712 randomly looking at the code base, i think this is prevalent in multiple places. Shall I update them all in this PR and also create a ticket? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be set to below local-log-start-offset. I am still trying to understand the impact of that. In the common case, the restarted broker can't become the leader or serve reads until it's caught up. At that time, the HWM will be up to date. In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? The jira also says: > If the high watermark is less than the local-log-start-offset, then the [UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358) method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. However, the follower read is bounded by logEndOffset, not HWM? Where does the follower read need to convert HWM to metadata? ## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ## @@ -318,6 +318,80 @@ class UnifiedLogTest { assertHighWatermark(4L) } + @Test + def testHighWatermarkMaintenanceForRemoteTopic(): Unit = { +val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, remoteLogStorageEnable = true) +val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) +val leaderEpoch = 0 + +def assertHighWatermark(offset: Long): Unit = { + assertEquals(offset, log.highWatermark) + assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark) +} + +// High watermark initialized to 0 +assertHighWatermark(0L) + +var offset = 0L +for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) +log.roll() +} +assertEquals(5, log.logSegments.size) + +// High watermark not changed by append +assertHighWatermark(0L) + +// Update high watermark as leader +log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L)) +assertHighWatermark(50L) +assertEquals(50L, log.logEndOffset) + +// Cannot update high watermark past the log end offset +log.updateHighWatermark(60L) +assertHighWatermark(50L) + +// simulate calls to upload 3 segments to remote storage and remove them from local-log. +log.updateHighestOffsetInRemoteStorage(30) +log.maybeIncrementLocalLogStartOffset(31L, LogStartOffsetIncrementReason.SegmentDeletion) +log.deleteOldSegments() +assertEquals(2, log.logSegments.size) +assertEquals(31L, log.localLogStartOffset()) +assertHighWatermark(50L) + +// simulate one remote-log segment deletion +val logStartOffset = 11L +log.maybeIncrementLogStartOffset(logStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) +assertEquals(11, log.logStartOffset) + +// Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. +log.updateHighWatermark(new LogOffsetMetadata(5L)) +assertHighWatermark(31L) +// Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. Review Comment: This is moving HW below local-log-start-offset, not log-start-offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrast
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563199330 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) { void handleUncleanBrokerShutdown(int brokerId, List records) { replicationControl.handleBrokerUncleanShutdown(brokerId, records); } + +void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws InterruptedException, ExecutionException { +appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), +() -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); +} Review Comment: calling `.get()` on an appendWriteEvent doesn't look right to me. If I understand correctly, the `appendWriteEvents` are handled in the quorum controller event loop thread. We would expect `replay()` to also be called in the event loop thread. so if we trigger an `appendWriteEvent` and block waiting for the result, it would always time out, since we are blocking the processing thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563195256 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: throwing as a throwable seems odd. ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { +try { +if (type == Type.TOPIC) { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName())); +} else { + minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty()); +} +} catch (InterruptedException | ExecutionException e) { +throw new Throwable("Fail to append partition updates for the min isr update: " + e.getMessage()); +} Review Comment: throwing an exception as a throwable seems odd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
splett2 commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1563193794 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { Review Comment: we shouldn't use `==` to compare strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836736#comment-17836736 ] A. Sophie Blee-Goldman commented on KAFKA-16514: I haven't gone back and re-read the KIP, but IIRC the reason for adding these CloseOptions was specific to solving an issue with static membership, hence why it only takes affect there. That said – I completely agree that there's no reason why this should only work with static membership, and the decision to not leave the group for non-static-membership is one of those biases that Kafka Streams has in assuming persistent state stores. I would fully support changing the behavior to work with non-static-members rather than just updating the javadocs to explain this. [~mjsax] would this need a KIP? Or can we just consider this a "bug" (especially since the javadocs make no mention that it's intended to only work on static members) and since we don't need any API changes, simply make the change without a KIP? Either way – [~sal.sorrentino] would you be interested in picking this up? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions
Chia-Ping Tsai created KAFKA-16545: -- Summary: Auto adjust the replica factor according to number of broker when using ClusterTestExtensions Key: KAFKA-16545 URL: https://issues.apache.org/jira/browse/KAFKA-16545 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai In most test cases, we start single broker so as to save resources. However, it could causes error when creating internal topics since they require 3 replicas by default. In order to reducing the duplicate configs from all tests, we can add a bit sugar to auto adjust the replica factor (if it is not defined by tests) when the number of brokers started by tests is less then default value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
[ https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16544: -- Assignee: Chia-Ping Tsai > DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames > should return null instead of throwing NPE > -- > > Key: KAFKA-16544 > URL: https://issues.apache.org/jira/browse/KAFKA-16544 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {code:java} > * @return A future map from topic names to descriptions which can be > used to check > * the status of individual description if the describe topic > request used > * topic names, otherwise return null, this request succeeds only > if all the > * topic descriptions succeed > {code} > According the docs, it should return null if we try to get the result > unmatched to the request. For example, we call `allTopicNames` in passing > `TopicIdCollection`. However, the current implementation will throw NPE > directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
Chia-Ping Tsai created KAFKA-16544: -- Summary: DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE Key: KAFKA-16544 URL: https://issues.apache.org/jira/browse/KAFKA-16544 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai {code:java} * @return A future map from topic names to descriptions which can be used to check * the status of individual description if the describe topic request used * topic names, otherwise return null, this request succeeds only if all the * topic descriptions succeed {code} According the docs, it should return null if we try to get the result unmatched to the request. For example, we call `allTopicNames` in passing `TopicIdCollection`. However, the current implementation will throw NPE directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Various cleanups in clients [kafka]
chia7712 commented on code in PR #15705: URL: https://github.com/apache/kafka/pull/15705#discussion_r1562938471 ## clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java: ## @@ -37,13 +37,13 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated private final Map buffersInFlight = new ConcurrentHashMap<>(); -private final GarbageCollectionListener gcListener = new GarbageCollectionListener(); private final Thread gcListenerThread; -private volatile boolean alive = true; +private volatile boolean alive; public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) { super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor); this.alive = true; Review Comment: it seems to me removing `this.alive = true;` is more suitable. ## clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java: ## @@ -79,12 +79,8 @@ public ByteBuffer get(int size) { @Override public void release(ByteBuffer buffer) { buffer.clear(); -Deque bufferQueue = bufferMap.get(buffer.capacity()); -if (bufferQueue == null) { -// We currently keep a single buffer in flight, so optimise for that case -bufferQueue = new ArrayDeque<>(1); -bufferMap.put(buffer.capacity(), bufferQueue); -} +Deque bufferQueue = bufferMap.computeIfAbsent(buffer.capacity(), k -> new ArrayDeque<>(1)); +// We currently keep a single buffer in flight, so optimise for that case Review Comment: Can we move this comment up? ## clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java: ## @@ -60,7 +58,6 @@ public SslChannelBuilder(Mode mode, this.mode = mode; this.listenerName = listenerName; this.isInterBrokerListener = isInterBrokerListener; -this.log = logContext.logger(getClass()); Review Comment: maybe we should pass it to `Utils.closeQuietly` instead of deleting it. ## clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java: ## @@ -136,8 +136,8 @@ private CoordinatorKey requireSingletonAndType(Set keys) { } private void ensureSameType(Set keys) { -if (keys.size() < 1) { -throw new IllegalArgumentException("Unexpected size of key set: expected >= 1, but got " + keys.size()); +if (keys.isEmpty()) { +throw new IllegalArgumentException("Unexpected size of key set: expected >= 1, but got 0"); Review Comment: maybe we can say "the keys can't be empty" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
junrao commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2052208460 @kamalcph : Thanks for the update. JDK 11 and Scala 2.13 didn't complete in the last test run. Could you trigger another run of the tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836717#comment-17836717 ] Jun Rao edited comment on KAFKA-16541 at 4/12/24 5:43 PM: -- Thanks for filing the jira, [~ocadaruma] ! Since this is a regression, it would be useful to have this fixed in 3.8.0 and 3.7.1. One way to fix it is to (1) change LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart to only write to memory without writing to the checkpoint file, (2) change the implementation of [renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681] so that it doesn't reinitialize from the file and just change the Path of the backing CheckpointFile. was (Author: junrao): Thanks for filing the jira, [~ocadaruma] ! Since this is a regression, it would be useful to have this fixed in 3.8.0 and 3.7.1. One way to fix it is to (1) change LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart to only write to memory without writing to the checkpoint file, (2) change the implementation of [renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681] so that it doesn't reinitialize from the file and just change the Path of the backing CheckpointFile. > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836717#comment-17836717 ] Jun Rao commented on KAFKA-16541: - Thanks for filing the jira, [~ocadaruma] ! Since this is a regression, it would be useful to have this fixed in 3.8.0 and 3.7.1. One way to fix it is to (1) change LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart to only write to memory without writing to the checkpoint file, (2) change the implementation of [renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681] so that it doesn't reinitialize from the file and just change the Path of the backing CheckpointFile. > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash
[ https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-16541: Affects Version/s: 3.7.0 > Potential leader epoch checkpoint file corruption on OS crash > - > > Key: KAFKA-16541 > URL: https://issues.apache.org/jira/browse/KAFKA-16541 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Minor > > Pointed out by [~junrao] on > [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125] > [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid > of fsync of leader-epoch ckeckpoint file in some path for performance reason. > However, since now checkpoint file is flushed to the device asynchronously by > OS, content would corrupt if OS suddenly crashes (e.g. by power failure, > kernel panic) in the middle of flush. > Corrupted checkpoint file could prevent Kafka broker to start-up -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
junrao commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1562928115 ## server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java: ## @@ -72,18 +72,20 @@ public CheckpointFile(File file, tempPath = Paths.get(absolutePath + ".tmp"); } -public void write(Collection entries) throws IOException { +public void write(Collection entries, boolean sync) throws IOException { synchronized (lock) { // write to temp file and then swap with the existing file try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { CheckpointWriteBuffer checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); checkpointWriteBuffer.write(entries); writer.flush(); -fileOutputStream.getFD().sync(); +if (sync) { +fileOutputStream.getFD().sync(); Review Comment: Thanks, @ocadaruma ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]
chia7712 commented on PR #15707: URL: https://github.com/apache/kafka/pull/15707#issuecomment-2052183089 the following links have similar issue. Should we fix them in this PR or I file a follow-up to fix them? https://github.com/apache/kafka/blob/61baa7ac6bb871797197d9289a848a0d4f587ced/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1704 https://github.com/apache/kafka/blob/e02ffd852fae7c1d2681621be7eb888e3805e027/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L299 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
cmccabe merged PR #15648: URL: https://github.com/apache/kafka/pull/15648 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in server and server-common [kafka]
chia7712 commented on code in PR #15710: URL: https://github.com/apache/kafka/pull/15710#discussion_r1562828732 ## server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java: ## @@ -41,9 +42,7 @@ public void testEmptyRegistry() { private static void assertIteratorContains(Iterator iter, Snapshot... snapshots) { List expected = new ArrayList<>(); Review Comment: How about `List expected = Arrays.asList(snapshots);`? ## server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java: ## @@ -116,8 +116,7 @@ public synchronized boolean maybeUpdatePushRequestTimestamp(long currentTime) { */ boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp; Review Comment: how about `boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp || currentTime - lastPushRequestTimestamp >= pushIntervalMs;`? ## server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java: ## @@ -310,10 +309,8 @@ private static void assertIteratorYields(Iterator iter, } } if (!extraObjects.isEmpty() || !remaining.isEmpty()) { -throw new RuntimeException("Found extra object(s): [" + String.join(", ", -extraObjects.stream().map(e -> e.toString()).collect(Collectors.toList())) + -"] and didn't find object(s): [" + String.join(", ", -remaining.keySet().stream().map(e -> e.toString()).collect(Collectors.toList())) + "]"); +throw new RuntimeException("Found extra object(s): [" + extraObjects.stream().map(e -> e.toString()).collect(Collectors.joining(", ")) + Review Comment: `e -> e.toString()` -> `Object::toString` ## server/src/main/java/org/apache/kafka/server/AssignmentsManager.java: ## @@ -321,7 +321,7 @@ public void run() throws Exception { AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse) response.responseBody()).data(); Set failed = filterFailures(data, inflight); -Set completed = Utils.diff(HashSet::new, inflight.values().stream().collect(Collectors.toSet()), failed); +Set completed = Utils.diff(HashSet::new, new HashSet<>(inflight.values()), failed); Review Comment: Maybe we don't need to create collection many times by skipping the failed elements. ```java Set failed = filterFailures(data, inflight); for (AssignmentEvent assignmentEvent : inflight.values()) { if (failed.contains(assignmentEvent)) continue; if (log.isDebugEnabled()) { log.debug("Successfully propagated assignment {}", assignmentEvent); } assignmentEvent.onComplete(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]
kirktrue commented on PR #15408: URL: https://github.com/apache/kafka/pull/15408#issuecomment-2052100630 @lucasbru—I think it's worth adding the tests, even if we already know there are more to be added in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: fix duplicated return and other streams docs typos [kafka]
AyoubOm opened a new pull request, #15713: URL: https://github.com/apache/kafka/pull/15713 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 merged PR #15670: URL: https://github.com/apache/kafka/pull/15670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836671#comment-17836671 ] Matthias J. Sax commented on KAFKA-16514: - CloseOption was introduced via [https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] The reasoning about the design should be on the KIP and corresponding DISCUSS thread. I agree that the JavaDocs are missing a lot of information. Would you be interested to do a PR to improve the JavaDocs? > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Various cleanups in shell [kafka]
mimaison opened a new pull request, #15712: URL: https://github.com/apache/kafka/pull/15712 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on PR #15702: URL: https://github.com/apache/kafka/pull/15702#issuecomment-2052041676 @mumrah Can you help take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in storage [kafka]
mimaison opened a new pull request, #15711: URL: https://github.com/apache/kafka/pull/15711 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Various cleanups in server and server-common [kafka]
mimaison opened a new pull request, #15710: URL: https://github.com/apache/kafka/pull/15710 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in tools [kafka]
mimaison opened a new pull request, #15709: URL: https://github.com/apache/kafka/pull/15709 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Various cleanups in trogdor [kafka]
mimaison opened a new pull request, #15708: URL: https://github.com/apache/kafka/pull/15708 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836644#comment-17836644 ] Sal Sorrentino commented on KAFKA-16514: Digging further into this, it seems the leaveGroup option is only supported if the "group.instance.id" is supplied via the StreamsConfig, however there is not documentation around this. The "group.instance.id" has no representation in the StreamsConfig class, event though it is intended to be a user supplied configuration providing static membership. It is unclear to me why this flag is only supported for static membership since a consumer group member can "leave" a group at any point in time without static membership. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15359) Support log.dir.failure.timeout.ms configuration for JBOD
[ https://issues.apache.org/jira/browse/KAFKA-15359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15359: --- Assignee: (was: Igor Soarez) > Support log.dir.failure.timeout.ms configuration for JBOD > - > > Key: KAFKA-15359 > URL: https://issues.apache.org/jira/browse/KAFKA-15359 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Priority: Major > > If the Broker repeatedly cannot communicate fails to communicate a log > directory failure after a configurable amount of time — > {{log.dir.failure.timeout.ms}} — and it is the leader for any replicas in the > failed log directory the broker will shutdown, as that is the only other way > to guarantee that the controller will elect a new leader for those partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]
soarez commented on code in PR #15697: URL: https://github.com/apache/kafka/pull/15697#discussion_r1562612304 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -507,6 +522,7 @@ class BrokerLifecycleManager( if (errorCode == Errors.NONE) { val responseData = message.data() failedAttempts = 0 + offlineDirs = offlineDirs.map(kv => kv._1 -> true) Review Comment: I think this is incorrect. If a new failed directory is added to `offlineDirs` in-between a hearbeat request-resopnse, then we'll clear it here before knowing if it will propagated to the controller. One idea is to hand down the offline dirs set in the request in `sendBrokerHeartBeat()` to `BrokerHeartbeatResponseEvent` through `BrokerHeartbeatResponseHandler` as a new constructor argument. ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -94,6 +94,7 @@ public class Defaults { public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 6; public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; +public static final long LOG_DIR_FAILURE_TIMEOUT_MS = 3L; Review Comment: This default seems reasonable to me. ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -211,7 +211,8 @@ class BrokerServer( time, s"broker-${config.nodeId}-", isZkBroker = false, -logDirs = logManager.directoryIdsSet) +logDirs = logManager.directoryIdsSet, +() => kafkaScheduler.schedule("shutdown", () => shutdown(), 0, -1)) Review Comment: There's a `scheduleOnce` alternative which sets `periodMs` to `-1`. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -528,6 +529,10 @@ object KafkaConfig { "If log.message.timestamp.type=CreateTime, the message will be rejected if the difference in timestamps exceeds " + "this specified threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." + val LogDirFailureTimeoutMsDoc = "If the broker is unable to successfully communicate to the controller that some log " + +"directory has failed for longer than this time, and there's at least one partition with leadership on that directory, " + Review Comment: > and there's at least one partition with leadership We aren't checking for this condition. We can either a) implement it; or b) keep it simple and drop this out of the configuration description. ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -327,16 +333,25 @@ class BrokerLifecycleManager( private class OfflineDirEvent(val dir: Uuid) extends EventQueue.Event { override def run(): Unit = { if (offlineDirs.isEmpty) { -offlineDirs = Set(dir) +offlineDirs = Map(dir -> false) } else { -offlineDirs = offlineDirs + dir +offlineDirs += (dir -> false) } if (registered) { scheduleNextCommunicationImmediately() } } } + private class OfflineDirBrokerFailureEvent(offlineDir: Uuid) extends EventQueue.Event { +override def run(): Unit = { + if (!offlineDirs.getOrElse(offlineDir, false)) { +error(s"Shutting down because couldn't communicate offline log dirs with controllers") Review Comment: We should include the directory in the error. It might also be helpful to resolve the directory ID to its path. Perhaps something like `dirIdToPath` in `AssignmentsManager` should be made available here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]
dajac commented on PR #15536: URL: https://github.com/apache/kafka/pull/15536#issuecomment-2051891235 Sorry for the delay on this one. I will review it next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix incorrect equals comparison of topicId by reference in PartitionMetadataFile [kafka]
alok123t opened a new pull request, #15707: URL: https://github.com/apache/kafka/pull/15707 Previously, we were comparing uuid by reference, which could be incorrect. This PR replaces to do the comparison by value instead ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]
dajac commented on PR #15706: URL: https://github.com/apache/kafka/pull/15706#issuecomment-2051869550 Hi @hudeqi. Thanks for the patch. I would like to better understand it. My first question is how would Flink commit Flink with a generationId equal to -1? The generation of the group is only managed by the group. It is not possible to alter it from an external system. The -1 passed in the offset commit request is only used for validation purposes. The reason why we don't write a tombstone in this case is because the group was never materialized in the log if it stayed at generation 0. I am not sure it is a worthwhile optimization though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]
hudeqi opened a new pull request, #15706: URL: https://github.com/apache/kafka/pull/15706 In the `cleanupGroupMetadata` method, tombstone messages is written to delete the group's MetadataKey only when the group is in the Dead state and the generation is greater than 0. The comment indicates: 'We avoid writing the tombstone when the generationId is 0, since this group is only using Kafka for offset storage.' This means that groups that only use Kafka for offset storage should not be deleted. However, there is a situation where, for example, Flink commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata will never be deleted. Yet, the logic above has already cleaned up commitKey by writing tombstone messages with `removedOffsets`. Therefore, the actual manifestation is: the group no longer exists (since the offsets have been cleaned up, there is no possibility of adding the group back to the `groupMetadataCache` unless offsets are committed again with the same group name), but the corresponding g roup metadata information still exists in __consumer_offsets. This leads to the problem that deleting the group does not completely clean up its related information. The group's state is set to Dead only in the following three situations: 1. The group information is unloaded 2. The group is deleted by ApiKeys.DELETE_GROUPS 3. All offsets of the group have expired or removed. Therefore, since the group is already in the Dead state and has been removed from the `groupMetadataCache`, why not directly clean up all the information of the group? Even if it is only used for storing offsets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16543) There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0
[ https://issues.apache.org/jira/browse/KAFKA-16543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-16543: --- Description: In the `cleanupGroupMetadata` method, tombstone messages is written to delete the group's MetadataKey only when the group is in the Dead state and the generation is greater than 0. The comment indicates: 'We avoid writing the tombstone when the generationId is 0, since this group is only using Kafka for offset storage.' This means that groups that only use Kafka for offset storage should not be deleted. However, there is a situation where, for example, Flink commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata will never be deleted. Yet, the logic above has already cleaned up commitKey by writing tombstone messages with `removedOffsets`. Therefore, the actual manifestation is: the group no longer exists (since the offsets have been cleaned up, there is no possibility of adding the group back to the `groupMetadataCache` unless offsets are committed again with the same group name), but the corresponding group metadata information still exists in __consumer_offsets. This leads to the problem that deleting the group does not completely clean up its related information. The group's state is set to Dead only in the following three situations: 1. The group information is unloaded 2. The group is deleted by ApiKeys.DELETE_GROUPS 3. All offsets of the group have expired or removed. Therefore, since the group is already in the Dead state and has been removed from the `groupMetadataCache`, why not directly clean up all the information of the group? Even if it is only used for storing offsets. was: In the `cleanupGroupMetadata` method, tombstone messages is written to delete the group's MetadataKey only when the group is in the Dead state and the generation is greater than 0. The comment indicates: 'We avoid writing the tombstone when the generationId is 0, since this group is only using Kafka for offset storage.' This means that groups that only use Kafka for offset storage should not be deleted. However, there is a situation where, for example, Flink commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata will never be deleted. Yet, the logic above has already cleaned up commitKey by writing tombstone messages with removedOffsets. Therefore, the actual manifestation is: the group no longer exists (since the offsets have been cleaned up, there is no possibility of adding the group back to the `groupMetadataCache` unless offsets are committed again with the same group name), but the corresponding group metadata information still exists in __consumer_offsets. This leads to the problem that deleting the group does not completely clean up its related information. The group's state is set to Dead only in the following three situations: 1. The group information is unloaded 2. The group is deleted by ApiKeys.DELETE_GROUPS 3. All offsets of the group have expired or removed. Therefore, since the group is already in the Dead state and has been removed from the `groupMetadataCache`, why not directly clean up all the information of the group? Even if it is only used for storing offsets. > There may be ambiguous deletions in the `cleanupGroupMetadata` when the > generation of the group is less than or equal to 0 > -- > > Key: KAFKA-16543 > URL: https://issues.apache.org/jira/browse/KAFKA-16543 > Project: Kafka > Issue Type: Bug > Components: group-coordinator >Affects Versions: 3.6.2 >Reporter: hudeqi >Assignee: hudeqi >Priority: Major > > In the `cleanupGroupMetadata` method, tombstone messages is written to delete > the group's MetadataKey only when the group is in the Dead state and the > generation is greater than 0. The comment indicates: 'We avoid writing the > tombstone when the generationId is 0, since this group is only using Kafka > for offset storage.' This means that groups that only use Kafka for offset > storage should not be deleted. However, there is a situation where, for > example, Flink commit offsets with a generationId equal to -1. If the > ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata > will never be deleted. Yet, the logic above has already cleaned up commitKey > by writing tombstone messages with `removedOffsets`. Therefore, the actual > manifestation is: the group no longer exists (since the offsets have been > cleaned up, there is no possibility of adding the group back to the > `groupMetadataCache` unless offsets are committed again with the same gr
[jira] [Created] (KAFKA-16543) There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0
hudeqi created KAFKA-16543: -- Summary: There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 Key: KAFKA-16543 URL: https://issues.apache.org/jira/browse/KAFKA-16543 Project: Kafka Issue Type: Bug Components: group-coordinator Affects Versions: 3.6.2 Reporter: hudeqi Assignee: hudeqi In the `cleanupGroupMetadata` method, tombstone messages is written to delete the group's MetadataKey only when the group is in the Dead state and the generation is greater than 0. The comment indicates: 'We avoid writing the tombstone when the generationId is 0, since this group is only using Kafka for offset storage.' This means that groups that only use Kafka for offset storage should not be deleted. However, there is a situation where, for example, Flink commit offsets with a generationId equal to -1. If the ApiKeys.DELETE_GROUPS is called to delete this group, Flink's group metadata will never be deleted. Yet, the logic above has already cleaned up commitKey by writing tombstone messages with removedOffsets. Therefore, the actual manifestation is: the group no longer exists (since the offsets have been cleaned up, there is no possibility of adding the group back to the `groupMetadataCache` unless offsets are committed again with the same group name), but the corresponding group metadata information still exists in __consumer_offsets. This leads to the problem that deleting the group does not completely clean up its related information. The group's state is set to Dead only in the following three situations: 1. The group information is unloaded 2. The group is deleted by ApiKeys.DELETE_GROUPS 3. All offsets of the group have expired or removed. Therefore, since the group is already in the Dead state and has been removed from the `groupMetadataCache`, why not directly clean up all the information of the group? Even if it is only used for storing offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]
chiacyu commented on code in PR #15678: URL: https://github.com/apache/kafka/pull/15678#discussion_r1562574967 ## server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java: ## @@ -172,10 +172,11 @@ public void testAssignmentAggregation() throws InterruptedException { manager.onAssignment(new TopicIdPartition(TOPIC_1, 3), DIR_3, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_1, 4), DIR_1, () -> { }); manager.onAssignment(new TopicIdPartition(TOPIC_2, 5), DIR_2, () -> { }); -while (!readyToAssert.await(1, TimeUnit.MILLISECONDS)) { +TestUtils.waitForCondition(() -> { time.sleep(100); manager.wakeup(); -} +return readyToAssert.await(1, TimeUnit.MILLISECONDS); +}, 1000, "Wait for ready to assert."); Review Comment: Great comments, just applied. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
cadonna commented on code in PR #15698: URL: https://github.com/apache/kafka/pull/15698#discussion_r1562500382 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long curr heartbeatRequestState.onSendAttempt(currentTimeMs); membershipManager.onHeartbeatRequestSent(); metricsManager.recordHeartbeatSentMs(currentTimeMs); +// Reset timer when sending the request, to make sure that, if waiting for the interval, +// we don't include the response time (which may introduce delay) Review Comment: Do we really need this comment? Additionally, I could not find a verification of this call in unit tests. Since you added a comment it seems to be important enough for a verification. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,34 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOnInFlight() { +mockStableMember(); +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + +// Heartbeat sent (no response received) +NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); +NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + +"previous on in-flight"); + +time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + +"interval expires if there is a previous HB request in-flight"); + +// Receive response for the inflight. The next HB should be sent on the next poll after +// the interval expires. +inflightReq.handler().onComplete(createHeartbeatResponse(inflightReq, Errors.NONE)); +time.sleep(DEFAULT_RETRY_BACKOFF_MS); +result = heartbeatRequestManager.poll(time.milliseconds()); +assertEquals(1, result.unsentRequests.size()); + Review Comment: ```suggestion ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -231,6 +231,34 @@ public void testTimerNotDue() { assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } +@Test +public void testHeartbeatNotSentIfAnotherOnInFlight() { Review Comment: typo ```suggestion public void testHeartbeatNotSentIfAnotherOneInFlight() { ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) { return heartbeatTimer.remainingMs(); } +public void onFailedAttempt(final long currentTimeMs) { +// Expire timer to allow sending HB after a failure. After a failure, a next HB may be +// needed with backoff (ex. errors that lead to retries, like coordinator load error), +// or immediately (ex. errors that lead to rejoining, like fencing errors). +heartbeatTimer.update(heartbeatTimer.currentTimeMs() + heartbeatTimer.remainingMs()); Review Comment: What about adding a method to `Timer` that expires the timer without updating it with a time point in the future. Alternatively, I think you could reset the `Timer` to 0 with `heartbeatTimer.reset(0)`. Do we need a verification in the unit tests for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in clients [kafka]
mimaison opened a new pull request, #15705: URL: https://github.com/apache/kafka/pull/15705 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch [kafka]
showuon commented on PR #14778: URL: https://github.com/apache/kafka/pull/14778#issuecomment-2051465918 Correction: For this: > some users might feel surprised when their fetch doesn't respond in fetch.max.wait.ms time. This is wrong. Even if the remote reading is not completed, yet, the fetch request will still return in `fetch.max.wait.ms`. It's just an empty response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836529#comment-17836529 ] Luke Chen commented on KAFKA-16510: --- OK, I see. If it's the issue of incorrectly SASL_SSL setting, then I think it's a known issue. Thanks for the reply. > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.7.0 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1562307197 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: I agree option (b) is better. But for this: > We can still run into trouble if the directory with the main replica is offline. At some point that will cause a crash if the directory ever comes back online. But there's nothing we can do about that here. Maybe future work could improve how the broker handles loading conflicting logs. I believe this PR: https://github.com/apache/kafka/pull/15335 should already fix this issue. @OmniaGM , could you help confirm it? If not, I think we should not promote the future log when main replica is offline to cause potential future issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
VedarthConfluent commented on code in PR #15704: URL: https://github.com/apache/kafka/pull/15704#discussion_r1562289692 ## docker/generate_kafka_pr_template.sh: ## @@ -0,0 +1,61 @@ +#!/usr/bin/env bash Review Comment: Add license string at the beginning of all new files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1562284319 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: I can see we basically do the similar things as what `replaceCurrentWithFutureLog` did, why should we duplicate the codes here again? Could we not be able to call `replaceCurrentWithFutureLog` with additional parameters to indicate we want to skip some actions? Like what we did now, we didn't close the `currentLog`, which should have potential resource leak. Using the `replaceCurrentWithFutureLog` could avoid this issue happening. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836517#comment-17836517 ] Hiro commented on KAFKA-16510: -- [~showuon] No, the problem occurred with all scripts. When I tried it yesterday, I thought the problem was resolved by changing the heap size, but now the same problem occurs even if I set KAFKA_HEAP_OPTS. Currently, this problem can be reproduced by incorrectly SASL_SSL setting or not setting around SASL_SSL configs in --command-config. Thanks. {code} sh-4.2$ /opt/kafka/bin/kafka-topics.sh --bootstrap-server :9093 --list Error while executing topic command : The AdminClient thread has exited. Call: listTopics [2024-04-12 09:00:55,990] ERROR org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics (org.apache.kafka.tools.TopicCommand) [2024-04-12 09:00:55,997] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) at org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) at java.base/java.lang.Thread.run(Thread.java:840) {code} > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.7.0 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFut
[jira] [Updated] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hiro updated KAFKA-16510: - Affects Version/s: 3.7.0 (was: 3.4.1) > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.7.0 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836506#comment-17836506 ] Walter Hernandez edited comment on KAFKA-2499 at 4/12/24 8:45 AM: -- This is indeed duplicated by the referenced earlier ticket, where there is a patch available; however, the patch needs a rebase (goes back to v0.9.0.1). In addition, the changes broke some system tests that were not resolved. With the referenced ticket not having any activity since 2018 and already has an associated PR, I would like to take on this effort for the latest version. was (Author: JIRAUSER305029): This is indeed duplicated by the referenced earlier ticket, where there is a patch available; however, the patch needs a rebase (goes back to v 1.0.0 ). In addition, the changes broke some system tests that were not resolved. With the referenced ticket not having any activity since 2018 and already has an associated PR, I would like to take on this effort for the latest version. > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Stopford >Assignee: Walter Hernandez >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836506#comment-17836506 ] Walter Hernandez commented on KAFKA-2499: - This is indeed duplicated by the referenced earlier ticket, where there is a patch available; however, the patch needs a rebase (goes back to v 1.0.0 ). In addition, the changes broke some system tests that were not resolved. With the referenced ticket not having any activity since 2018 and already has an associated PR, I would like to take on this effort for the latest version. > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez updated KAFKA-2499: Issue Type: Improvement (was: Bug) > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Stopford >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walter Hernandez reassigned KAFKA-2499: --- Assignee: Walter Hernandez > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Stopford >Assignee: Walter Hernandez >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499 ] Walter Hernandez deleted comment on KAFKA-2499: - was (Author: JIRAUSER305029): This indeed duplicates the referenced earlier ticket, where there is a patch available. However, this patch needs a rebase (goes back to v2.1.0). > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-2499) kafka-producer-perf-test should use something more realistic than empty byte arrays
[ https://issues.apache.org/jira/browse/KAFKA-2499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836503#comment-17836503 ] Walter Hernandez commented on KAFKA-2499: - This indeed duplicates the referenced earlier ticket, where there is a patch available. However, this patch needs a rebase (goes back to v2.1.0). > kafka-producer-perf-test should use something more realistic than empty byte > arrays > --- > > Key: KAFKA-2499 > URL: https://issues.apache.org/jira/browse/KAFKA-2499 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford >Priority: Major > Labels: newbie > > ProducerPerformance.scala (There are two of these, one used by the shell > script and one used by the system tests. Both exhibit this problem) > creates messags from empty byte arrays. > This is likely to provide unrealistically fast compression and hence > unrealistically fast results. > Suggest randomised bytes or more realistic sample messages are used. > Thanks to Prabhjot Bharaj for reporting this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16510) java.lang.OutOfMemoryError in kafka-metadata-quorum.sh
[ https://issues.apache.org/jira/browse/KAFKA-16510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836501#comment-17836501 ] Luke Chen commented on KAFKA-16510: --- [~hiroa] , I'd like to confirm that only `kafka-metadata-quorum.sh` script causes OOM issue, not other scripts, like kafka-topics.sh, kafka-configs.sh. Is that right? > java.lang.OutOfMemoryError in kafka-metadata-quorum.sh > -- > > Key: KAFKA-16510 > URL: https://issues.apache.org/jira/browse/KAFKA-16510 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.4.1 >Reporter: Hiro >Priority: Major > > kafka-metadata-quorum is not available in SASL_PLAIN. > I got this error, I only use SASL_PLAIN. not use SSL. > I found a person with a similar situation, but he is using mTLS. > https://issues.apache.org/jira/browse/KAFKA-16006 > {code:java} > sh-4.2$ /opt/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server ip>:9093 --command-config controller-admin.properties describe --replication > [2024-04-11 04:12:54,128] ERROR Uncaught exception in thread > ‘kafka-admin-client-thread | adminclient-1': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:64) > at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:476) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:573) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:251) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) > at org.apache.kafka.common.network.Selector.poll(Selector.java:481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:585) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1504) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1435) > at java.base/java.lang.Thread.run(Thread.java:840) > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has > exited. Call: describeMetadataQuorum > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > org.apache.kafka.tools.MetadataQuorumCommand.handleDescribeReplication(MetadataQuorumCommand.java:158) > at > org.apache.kafka.tools.MetadataQuorumCommand.execute(MetadataQuorumCommand.java:106) > at > org.apache.kafka.tools.MetadataQuorumCommand.mainNoExit(MetadataQuorumCommand.java:62) > at > org.apache.kafka.tools.MetadataQuorumCommand.main(MetadataQuorumCommand.java:57) > Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient > thread has exited. Call: describeMetadataQuorum {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836472#comment-17836472 ] Francois Visconte edited comment on KAFKA-16511 at 4/12/24 8:24 AM: I ran kafka-delete-records.sh and it did the trick {code:java} [RemoteLogManager=10015 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), segment-end-offset: 2978396 and segment-epochs: [7] {code} Do you happen to know what triggers the issue? was (Author: JIRAUSER288982): I ran kafka-delete-records.sh and it did the trick {code:java} [RemoteLogManager=10015 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), segment-end-offset: 2978396 and segment-epochs: [7] {code} > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remo
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836472#comment-17836472 ] Francois Visconte commented on KAFKA-16511: --- I ran kafka-delete-records.sh and it did the trick {code:java} [RemoteLogManager=10015 partition=5G8Ai8kBSwmQ3Ln4QRY5rA:raw_spans_datadog_3543-765] Deleted remote log segment RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1-765, id=8dP13VDYSaiFlubl9SNBTQ} due to leader-epoch-cache truncation. Current earliest-epoch-entry: EpochEntry(epoch=21, startOffset=2978397), segment-end-offset: 2978396 and segment-epochs: [7] {code} > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remote log segment was removed. > Now if I look on one of the leaked segment, here is what I see > > {code:java} > "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied > 02971163.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ}" > "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegme
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1562184559 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { Review Comment: Does the file name always ends with `.delete`? Should we check it before deletion? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); Review Comment: 1. Why does it contain `delete` in the end? 2. Unfortunately, the topic name could contain `-` or `.`, so it's unsafe to do regex like this. I'm thinking we can pass `topicPartition` as parameter into `deleteTypeIfExists` so that we don't have to do further regex like this. And just verify if fileName.endsWith("future") because the normal folder name should always ends with a number (partition number), instead of "future". WDYT? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { +LOGGER.warn("Fallback to delete {} {}.", fileType, fallbackFile.getAbsolutePath()); Review Comment: Why did we use `warn` here? I think we can use `info` since it's expected behavior. WDYT? ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRe
[jira] [Assigned] (KAFKA-16522) Admin client changes for adding and removing voters
[ https://issues.apache.org/jira/browse/KAFKA-16522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quoc Phong Dang reassigned KAFKA-16522: --- Assignee: Quoc Phong Dang > Admin client changes for adding and removing voters > --- > > Key: KAFKA-16522 > URL: https://issues.apache.org/jira/browse/KAFKA-16522 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Assignee: Quoc Phong Dang >Priority: Major > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes#KIP853:KRaftControllerMembershipChanges-Admin -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while
[ https://issues.apache.org/jira/browse/KAFKA-13636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836455#comment-17836455 ] zhangzhisheng commented on KAFKA-13636: --- Kafka broker version is 2.7.2 Client version is 2.5.2. auto.offset.reset is earliest there have been instances where offset messages were deleted on the consumer side. {code:java} // code placeholder org.apache.kafka.clients.consumer.internals.SubscriptionState SubscriptionState.java:397 [trace=,span=] - [Consumer clientId=app1_a2acc55fc0bbac81, groupId=gid-app1] Resetting offset for partition topic-4 to offset 3577938321. {code} > Committed offsets could be deleted during a rebalance if a group did not > commit for a while > --- > > Key: KAFKA-13636 > URL: https://issues.apache.org/jira/browse/KAFKA-13636 > Project: Kafka > Issue Type: Bug > Components: core, offset manager >Affects Versions: 2.4.0, 2.5.1, 2.6.2, 2.7.2, 2.8.1, 3.0.0 >Reporter: Damien Gasparina >Assignee: Prince Mahajan >Priority: Major > Fix For: 3.0.1, 2.8.2, 3.2.0, 3.1.1 > > > The group coordinator might delete invalid offsets during a group rebalance. > During a rebalance, the coordinator is relying on the last commit timestamp > ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state > modification {_}timestamp (currentStateTimestamp{_}) to detect expired > offsets. > > This is relatively easy to reproduce by playing with > group.initial.rebalance.delay.ms, offset.retention.minutes and > offset.check.retention.interval, I uploaded an example on: > [https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] . > This script does: > * Start a broker with: offset.retention.minute=2, > o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000, > group.initial.rebalance.delay=2 > * Produced 10 messages > * Create a consumer group to consume 10 messages, and disable auto.commit to > only commit a few times > * Wait 3 minutes, then the Consumer get a {{kill -9}} > * Restart the consumer after a few seconds > * The consumer restart from {{auto.offset.reset}} , the offset got removed > > The cause is due to the GroupMetadata.scala: > * When the group get emptied, the {{subscribedTopics}} is set to > {{Set.empty}} > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521]) > * When the new member joins, we add the new member right away in the group ; > BUT the {{subscribedTopics}} is only updated once the migration is over (in > the initNewGeneration) (which could take a while due to the > {{{}group.initial.rebalance.delay{}}}) > * When the log cleaner got executed, {{subscribedTopics.isDefined}} returns > true as {{Set.empty != None}} (the underlying condition) > * Thus we enter > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785] > with an empty {{subscribedTopics}} list and we are relying on the > {{commitTimestamp}} regardless of the {{currentStateTimestamp}} > > This seem to be a regression generated by KIP-496 > https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges > (KAFKA-8338, KAFKA-8370) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836444#comment-17836444 ] Kamal Chandraprakash commented on KAFKA-16511: -- The segment deletion might be stuck due to [RemoteLogManager#isRemoteSegmentWithinLeaderEpochs|https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1241] check. The {{log-start-offset}} for this partition 765 might be moved using the {{kafka-delete-records.sh}} script so the check fails to mark it as valid segment. > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remote log segment was removed. > Now if I look on one of the leaked segment, here is what I see > > {code:java} > "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied > 02971163.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ}" > "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3
[PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]
KrishVora01 opened a new pull request, #15704: URL: https://github.com/apache/kafka/pull/15704 This PR adds code to support Apache Kafka Docker Official Images. The files added include: 1. .github/workflows/prepare_docker_official_image_source.yml This adds github actions workflow to create artifact containing the hardcoded static source files for docker official images. 2. docker/prepare_docker_official_image_source.py This is the python script to create sub-directory containing the hardcoded static source files for docker official images (which is later pushed as an artifact). 3. .github/workflows/docker_official_image_build_and_test.yml This adds github actions workflow to build and test docker official image. 4. docker/docker_official_image_build_test.py This is the python script to build and test docker official image. 5. docker/common.sh and docker/generate_kafka_pr_template.sh This are scripts to create the PR template to be raised in the docker official images repo. 6. docker/docker_official_images/.gitignore This is the new directory to keep Docker Official Image assets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16373: Adding Apache Kafka Docker Official Images Directory [kafka]
KrishVora01 closed pull request #15703: KAFKA-16373: Adding Apache Kafka Docker Official Images Directory URL: https://github.com/apache/kafka/pull/15703 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org