Re: [PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]
divijvaidya commented on PR #14764: URL: https://github.com/apache/kafka/pull/14764#issuecomment-1873267580 Unrelated test failures, all tests modified in this PR are successful. ``` [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_8_and_Scala_2_12___testResetSinkConnectorOffsetsZombieSinkTasks/) [Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsBounceTest.testWithGroupMetadata()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupMetadata__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testConfigurationOperations__/) [Build / JDK 21 and Scala 2.13 / org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_21_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/) [Build / JDK 21 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testConfigResourceExistenceChecker()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13___testConfigResourceExistenceChecker__/) [Build / JDK 21 and Scala 2.13 / org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft/) [Build / JDK 21 and Scala 2.13 / org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft_2/) [Build / JDK 11 and Scala 2.13 / kafka.api.PlaintextConsumerTest.testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/PlaintextConsumerTest/Build___JDK_11_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/) [Build / JDK 17 and Scala 2.13 / kafka.api.SaslPlaintextConsumerTest.testCoordinatorFailover(String, String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/SaslPlaintextConsumerTest/Build___JDK_17_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/) [Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId__/) [Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId___2/) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]
divijvaidya merged PR #14764: URL: https://github.com/apache/kafka/pull/14764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16073) Tiered Storage Bug: Incorrect Handling of Offset Ranges During Segment Deletion
hzh0425 created KAFKA-16073: --- Summary: Tiered Storage Bug: Incorrect Handling of Offset Ranges During Segment Deletion Key: KAFKA-16073 URL: https://issues.apache.org/jira/browse/KAFKA-16073 Project: Kafka Issue Type: Bug Components: core, Tiered-Storage Affects Versions: 3.6.1 Reporter: hzh0425 Assignee: hzh0425 Fix For: 3.6.1 This bug pertains to Apache Kafka's tiered storage functionality. Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} method. The method first deletes segments from memory but delays updating the {{{}localLogStartOffset{}}}. Meanwhile, in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is less than {{{}localLogStartOffset{}}}, it triggers the read remote process. However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the client. Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < offset3{}}}. A client requests {{offset2}} while a background thread is deleting segments. The segments are deleted in memory, but {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to {{{}offset3{}}}. In this state, since {{offset2}} is greater than {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously returns an {{OffsetOutOfRangeException}} to the client. This happens because the system has not yet recognized the new starting offset ({{{}offset3{}}}), leading to incorrect handling of fetch requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hzh0425 updated KAFKA-16073: Summary: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion (was: Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion) > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > This bug pertains to Apache Kafka's tiered storage functionality. > Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} > method. The method first deletes segments from memory but delays updating the > {{{}localLogStartOffset{}}}. Meanwhile, in > {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is > less than {{{}localLogStartOffset{}}}, it triggers the read remote process. > However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the > client. > Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < > offset3{}}}. A client requests {{offset2}} while a background thread is > deleting segments. The segments are deleted in memory, but > {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to > {{{}offset3{}}}. In this state, since {{offset2}} is greater than > {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously > returns an {{OffsetOutOfRangeException}} to the client. This happens because > the system has not yet recognized the new starting offset ({{{}offset3{}}}), > leading to incorrect handling of fetch requests. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16073) Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hzh0425 updated KAFKA-16073: Summary: Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion (was: Tiered Storage Bug: Incorrect Handling of Offset Ranges During Segment Deletion) > Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to > Delayed localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > This bug pertains to Apache Kafka's tiered storage functionality. > Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} > method. The method first deletes segments from memory but delays updating the > {{{}localLogStartOffset{}}}. Meanwhile, in > {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is > less than {{{}localLogStartOffset{}}}, it triggers the read remote process. > However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the > client. > Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < > offset3{}}}. A client requests {{offset2}} while a background thread is > deleting segments. The segments are deleted in memory, but > {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to > {{{}offset3{}}}. In this state, since {{offset2}} is greater than > {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously > returns an {{OffsetOutOfRangeException}} to the client. This happens because > the system has not yet recognized the new starting offset ({{{}offset3{}}}), > leading to incorrect handling of fetch requests. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hzh0425 updated KAFKA-16073: Description: The identified bug in Apache Kafka's tiered storage feature involves a delayed update of {{localLogStartOffset}} in the {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. When segments are deleted from the log's memory state, the {{localLogStartOffset}} isn't promptly updated. Concurrently, {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka erroneously sends an {{OffsetOutOfRangeException}} to the consumer. In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a background deletion process removes segments from memory, it hasn't yet updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to incorrect handling of consumer fetch requests and potential data access errors. was: This bug pertains to Apache Kafka's tiered storage functionality. Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} method. The method first deletes segments from memory but delays updating the {{{}localLogStartOffset{}}}. Meanwhile, in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is less than {{{}localLogStartOffset{}}}, it triggers the read remote process. However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the client. Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < offset3{}}}. A client requests {{offset2}} while a background thread is deleting segments. The segments are deleted in memory, but {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to {{{}offset3{}}}. In this state, since {{offset2}} is greater than {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously returns an {{OffsetOutOfRangeException}} to the client. This happens because the system has not yet recognized the new starting offset ({{{}offset3{}}}), leading to incorrect handling of fetch requests. > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14517:Implement regex subscriptions [kafka]
JimmyWang6 commented on PR #14327: URL: https://github.com/apache/kafka/pull/14327#issuecomment-1873362395 @ > @JimmyWang6 , I think for this part > > > What if a new topic gets created which matches the regex subscription of some member. > > we need to use the metadata update mechanism defined by `metadata.max.age.ms`. This is the interval using which the consumer will keep refreshing the metadata and do the pattern matching periodically against all topics existing at the time of check. Having said that, this looks like a client level config so not sure if it needs to be part of this PR. @dajac , WDYT? @dajac @vamossagar12 Thank you for your comments! I have added more unit tests and addressed the issues mentioned in the previous comments. In the case of a newly created topic that matches the regex subscription of certain members, I believe the method public void **onNewMetadataImage(MetadataImage newImage, MetadataDelta delta)** will be invoked. Therefore, I have made modifications to the groupsSubscribedToTopic method to identify the consumer groups that have subscribed to topics matching the regular expression and requested a metadata refresh for those groups. The latest code appears to be functioning well. Do you have any further comments or suggestions regarding this approach? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpit Goyal updated KAFKA-16063: Attachment: Screenshot 2024-01-01 at 10.51.03 PM-1.png > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpit Goyal updated KAFKA-16063: Attachment: Screenshot 2024-01-01 at 10.51.03 PM.png > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801568#comment-17801568 ] Arpit Goyal commented on KAFKA-16063: - [~divijvaidya] I am able to reproduce the issue locally and also figured out the reason of the leak. While initialize the directory Service in MiniKDC.scala https://github.com/apache/kafka/blob/trunk/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala#L181 DirectoryService looks for shutdownHookEnabled flag which is enabled by default and adds ApplicationShutdownHooks into the Identity HashMap. {code:java} if ( shutdownHookEnabled ) { Runtime.getRuntime().addShutdownHook( new Thread( new Runnable() { public void run() { try { shutdown(); } catch ( Exception e ) { LOG.warn( "Failed to shut down the directory service: " + DefaultDirectoryService.this.instanceId, e ); } } }, "ApacheDS Shutdown Hook (" + instanceId + ')' ) ); LOG.info( "ApacheDS shutdown hook has been registered with the runtime." ); } {code} But This Map is never cleared in the DirectoryServiceShutdown method which we called during MiniKDC stop function. I think we can disable the shutdownHookEnabled flag , as directory service shutdown method is already being called in the MiniKDC stop function. {code:java} ds.setShutdownHookEnabled(false) {code} Attached screenshot for the reference. !Screenshot 2024-01-01 at 10.51.03 PM.png! > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801577#comment-17801577 ] Divij Vaidya commented on KAFKA-16063: -- Nice find and yes, disabling shutdown hook sounds like a plan. I am curious though, if the map is not cleared during stop function, then who clears it? As an alternative solution, should we instead clear the map in stop function? This would ensure that even if we forget to call stop, on process exit, kdc will stop definitely due to the hooks. > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16072) Create Junit 5 extension to detect thread leak
[ https://issues.apache.org/jira/browse/KAFKA-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Werner reassigned KAFKA-16072: - Assignee: Dmitry Werner > Create Junit 5 extension to detect thread leak > -- > > Key: KAFKA-16072 > URL: https://issues.apache.org/jira/browse/KAFKA-16072 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Divij Vaidya >Assignee: Dmitry Werner >Priority: Major > Labels: newbie++ > > The objective of this task is to create a Junit extension that will execute > after every test and verify that there are no lingering threads left over. > An example of how to create an extension can be found here: > [https://github.com/apache/kafka/pull/14783/files#diff-812cfc2780b6fc0e7a1648ff37912ff13aeda4189ea6b0d4d847b831f66e56d1] > An example on how to find unexpected threads is at > [https://github.com/apache/kafka/blob/d5aa341a185f4df23bf587e55bcda4f16fc511f1/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2427] > and also at > https://issues.apache.org/jira/browse/KAFKA-16052?focusedCommentId=17800978&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17800978 > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]
wernerdv opened a new pull request, #15101: URL: https://github.com/apache/kafka/pull/15101 Added LeakTestingExtension based on TestUtils#verifyNoUnexpectedThreads for verify that there are no lingering threads left over. Extension execute after every test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
[ https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16055: Component/s: streams > Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders > > > Key: KAFKA-16055 > URL: https://issues.apache.org/jira/browse/KAFKA-16055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Kohei Nozaki >Priority: Minor > > This was originally raised in [a kafka-users > post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol]. > There is a HashMap stored in QueryableStoreProvider#storeProviders ([code > link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39]) > which can be mutated by a KafkaStreams#removeStreamThread() call. This can > be problematic when KafkaStreams#store is called from a separate thread. > We need to somehow make this part of code thread-safe by replacing it by > ConcurrentHashMap or/and using an existing locking mechanism. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]
mjsax opened a new pull request, #15102: URL: https://github.com/apache/kafka/pull/15102 We did no complete KIP-714 with regard to collecting producer clients instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing an UnsupportedOperationException, we should return an empty map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]
mjsax commented on PR #15102: URL: https://github.com/apache/kafka/pull/15102#issuecomment-1873512205 We still don't have an RC for 3.7, so it would be great to close this minor gap to provide a better user experience. \cc @stanislavkozlovski for visibility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Move KafkaConfig to server module [kafka]
OmniaGM opened a new pull request, #15103: URL: https://github.com/apache/kafka/pull/15103 still - WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
ijuma commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1439091103 ## core/src/test/scala/unit/kafka/log/LocalLogTest.scala: ## @@ -362,8 +364,8 @@ class LocalLogTest { } assertEquals(5, log.segments.numberOfSegments) assertNotEquals(10L, log.segments.activeSegment.baseOffset) -val expected = log.segments.values.asScala.toVector -val deleted = log.truncateFullyAndStartAt(10L) +val expected = new util.ArrayList(log.segments.values) +val deleted = StreamSupport.stream(log.truncateFullyAndStartAt(10L).spliterator(), false).collect(Collectors.toList()) Review Comment: This can be simplified if `truncateFullyAndStartAt` returns `Collection` instead of `Iterable`. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java: ## @@ -0,0 +1,1146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.message.FetchResponseData; +import org.apache.kafka.common.record.FileLogInputStream; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.Scheduler; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset. + * New log segments are created according to a configurable policy that controls the size in bytes or time interval + * for a given segment. + * + * NOTE: this class is not thread-safe. + */ +public class LocalLog { + +/** + * a file that is scheduled to be deleted + */ +public static final String DELETED_FILE_SUFFIX = LogFileUtils.DELETED_FILE_SUFFIX; + +/** + * A temporary file that is being used for log cleaning + */ +public static final String CLEANED_FILE_SUFFIX = ".cleaned"; + +/** + * A temporary file used when swapping files into the log + */ +public static final String SWAP_FILE_SUFFIX = ".swap"; + +/** + * a directory that is scheduled to be deleted + */ +public static final String DELETE_DIR_SUFFIX = "-delete"; + +/** + * a directory that is used for future partition + */ +public static final String FUTURE_DIR_SUFFIX = "-future"; +public static final String STRAY_DIR_SUFFIX = "-stray"; + +public static final Pattern DELETE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX); +public static final Pattern FUTURE_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX); +public static final Pattern STRAY_DIR_PATTERN = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX); + +public static final long UNKNOWN_OFFSET = -1L; + +private final Logger logger; + +private final LogSegments segments; +private final Scheduler scheduler; +private final Time time; +private final TopicPartition topicPartition; +private final LogDirFailureChannel logDirFailureChannel; + +// Last time the log was flushed +private final AtomicLong lastFlushedTime; + +private volatile
Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]
ijuma commented on code in PR #14034: URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, * (if there is one). It returns true iff the segment is deletable. * @return the segments ready to be deleted */ - private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { -def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], upperBoundOffset: Long): Boolean = { - val allowDeletionDueToLogStartOffsetIncremented = nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset + private[log] def deletableSegments(predicate: (LogSegment, Optional[LogSegment]) => Boolean): Iterable[LogSegment] = { Review Comment: We should change this to return `java.util.Collection[LogSegment]` to avoid unnecessary conversions. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } localLog.checkIfMemoryMappedBufferClosed() // remove the segments for lookups -localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason) +localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava, true, reason) Review Comment: This conversion can be avoided if we make the change to the method signature suggested above. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private def deleteLogStartOffsetBreachedSegments(): Int = { -def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { - nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) localLogStartOffset() else logStartOffset)) +def shouldDelete(segment: LogSegment, nextSegmentOpt: Optional[LogSegment]): Boolean = { + if (nextSegmentOpt.isPresent) +nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled()) localLogStartOffset() else logStartOffset) + else false Review Comment: Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...` ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, } private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = { Review Comment: We should change this to return `java.util.Collection` or `java.util.List` to avoid unnecessary conversions. ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging { def deleteProducerSnapshots(): Unit = { LocalLog.maybeHandleIOException(logDirFailureChannel, parentDir, -s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir") { +s"Error while deleting producer state snapshots for $topicPartition in dir $parentDir", { snapshotsToDelete.foreach { snapshot => snapshot.deleteIfExists() } - } + return; Review Comment: Hmm, it is a bit odd that a `return` with no value is required for scala code. Is this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801663#comment-17801663 ] Satish Duggana commented on KAFKA-16073: Thanks [~hzh0425@apache] for filing JIRA with a detailed description. I am trying to summarize the scenario that you mentioned earlier in JIRA description with an example. Let me know if I am missing anything here. Let us assume each segment has one offset in this example. log start offset0 log end offset 10 local log start offset 4 fetch offset6 new local log start offset 7 Deletion based on retention configs is started and eventually updating the local log start offset as 7. There is a race condition here where the segments list is updated by removing 4, 5, and 6 offset segments in LocalLog and then updates the local-log-start-offset. But fetch offset is being served concurrently and it may throw OffsetOutOfRangeException if the inmemory segments are already removed in LocalLog and local-log-start-offset is not yet updated as 7 when it executes the [code|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1866] as it fails the condition because fetch offset(6) < old local-log-start-offset(4). > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion
[ https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801667#comment-17801667 ] hzh0425 commented on KAFKA-16073: - [~satish.duggana] Yes, it's exactly what you described. I actually encountered this case in a production environment Do you have any ideas for repair? For example, when fetching localLogStartOffset, a lock should be added > Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed > localLogStartOffset Update During Segment Deletion > > > Key: KAFKA-16073 > URL: https://issues.apache.org/jira/browse/KAFKA-16073 > Project: Kafka > Issue Type: Bug > Components: core, Tiered-Storage >Affects Versions: 3.6.1 >Reporter: hzh0425 >Assignee: hzh0425 >Priority: Major > Labels: KIP-405, kip-405, tiered-storage > Fix For: 3.6.1 > > > The identified bug in Apache Kafka's tiered storage feature involves a > delayed update of {{localLogStartOffset}} in the > {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. > When segments are deleted from the log's memory state, the > {{localLogStartOffset}} isn't promptly updated. Concurrently, > {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch > offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka > erroneously sends an {{OffsetOutOfRangeException}} to the consumer. > In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < > offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a > background deletion process removes segments from memory, it hasn't yet > updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. > Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against > the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, > it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue > arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to > incorrect handling of consumer fetch requests and potential data access > errors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801668#comment-17801668 ] Arpit Goyal commented on KAFKA-16063: - [~divijvaidya] It seems to be a well known leak over the internet. https://blog.creekorful.org/2020/03/classloader-and-memory-leaks/ https://stackoverflow.com/questions/6385018/memory-leaks-with-addshutdownhook The map will get cleared only when JVM shutdown happen, during which this hook registered would get executed and cleared. {code:java} static void runHooks() { Collection threads; synchronized(ApplicationShutdownHooks.class) { threads = hooks.keySet(); hooks = null; } for (Thread hook : threads) { hook.start(); } for (Thread hook : threads) { while (true) { try { hook.join(); break; } catch (InterruptedException ignored) { } } } } {code} We does not have the thread reference to clear it manually. removeShutdownHook require thread reference which is anonymously created within the DefaultDirectoryService library code. {code:java} Runtime.getRuntime.removeShutdownHook(THREAD) {code} The only feasible option as far the analysis is to disable the hook enabled flag. > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16050) consumer was removed from group,but still can poll data from kafka, data duplicate
[ https://issues.apache.org/jira/browse/KAFKA-16050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801087#comment-17801087 ] Xin edited comment on KAFKA-16050 at 1/2/24 6:45 AM: - Finally I found the reason Consumer Rejoin operation was delay for a long time, and another consumer has been assigned the partitions Can we optimize these process? was (Author: auroraxlh): Finally Ifound the reason Consumer Rejoin operation was delay for a long time, and another consumer has been assigned the partitions Can we optimize these process? > consumer was removed from group,but still can poll data from kafka, data > duplicate > --- > > Key: KAFKA-16050 > URL: https://issues.apache.org/jira/browse/KAFKA-16050 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.1 >Reporter: Xin >Priority: Major > > I have 3 brokers: b1,b2,b3 > a topic : test, partitiion5,replication3 > 3 consumer in 1 group: consumer1,consumer2,consumer3 > groupid: xx > > consumer1 running in b1 > consumer2 running in b2 > consumer3 running in b3 > ./kafka-console-consumer.sh --bootstrap-server localhost:9093 --group xx > --topic test --from-beginning > > b2's clock changed, consumer2 was removed from group xx(reason: removing > member consumer-2 on heartbeat expiration) > (kafka.coordinator.group.GroupCoordinator) > kafka-consumer-groups.sh can't see any record about cosumer2 > ./kafka-consumer-groups.sh --bootstrap-server localhost:9093 --all-topics > --describe --all-groups > > Then consumer rebalanced, partiitons assigned to consumer2 was assigned to > other consumer > Although consumer2 was removed from group xx ,BUT still poll data from kafka > ,kafka can't find it > After rebalance another consumer poll the same partition with consumer2 > This make data was poll duplicate > > > > > > > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]
clolov commented on code in PR #14929: URL: https://github.com/apache/kafka/pull/14929#discussion_r1439203605 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -946,7 +966,6 @@ public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() { final Map mockTasks = mock(Map.class); EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes(); EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes(); -EasyMock.expect(stateManager.taskType()).andReturn(type); Review Comment: This setup, and the two below, was reported as unnecessary by Mockito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]
iit2009060 opened a new pull request, #15104: URL: https://github.com/apache/kafka/pull/15104 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
[ https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801687#comment-17801687 ] Arpit Goyal commented on KAFKA-16063: - [~divijvaidya] [~showuon] PR for review. https://github.com/apache/kafka/pull/15104 > Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests > - > > Key: KAFKA-16063 > URL: https://issues.apache.org/jira/browse/KAFKA-16063 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Arpit Goyal >Priority: Major > Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot > 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, > Screenshot 2024-01-01 at 10.51.03 PM.png > > > All test extending `EndToEndAuthorizationTest` are leaking > DefaultDirectoryService objects. > This can be observed using the heap dump at > [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0] > (unzip this and you will find a hprof which can be opened with your > favourite heap analyzer) > The stack trace looks like this: > !Screenshot 2023-12-29 at 12.38.29.png! > > I suspect that the reason is because DefaultDirectoryService#startup() > registers a shutdownhook which is somehow messed up by > QuorumTestHarness#teardown(). > We need to investigate why this is leaking and fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010)