Re: [PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]

2024-01-01 Thread via GitHub


divijvaidya commented on PR #14764:
URL: https://github.com/apache/kafka/pull/14764#issuecomment-1873267580

   Unrelated test failures, all tests modified in this PR are successful.
   ```
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_8_and_Scala_2_12___testResetSinkConnectorOffsetsZombieSinkTasks/)
   [Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsBounceTest.testWithGroupMetadata()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupMetadata__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_8_and_Scala_2_12___testConfigurationOperations__/)
   [Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsetsZombieSinkTasks](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.connect.integration/OffsetsApiIntegrationTest/Build___JDK_21_and_Scala_2_13___testResetSinkConnectorOffsetsZombieSinkTasks/)
   [Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testConfigResourceExistenceChecker()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_21_and_Scala_2_13___testConfigResourceExistenceChecker__/)
   [Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft/)
   [Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.tools.TopicCommandIntegrationTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String).kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.tools/TopicCommandIntegrationTest/Build___JDK_21_and_Scala_2_13___testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress_String__kraft_2/)
   [Build / JDK 11 and Scala 2.13 / 
kafka.api.PlaintextConsumerTest.testCoordinatorFailover(String, 
String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/PlaintextConsumerTest/Build___JDK_11_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_11_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated__/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.SaslPlaintextConsumerTest.testCoordinatorFailover(String, 
String).quorum=zk.groupProtocol=generic](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/SaslPlaintextConsumerTest/Build___JDK_17_and_Scala_2_13___testCoordinatorFailover_String__String__quorum_zk_groupProtocol_generic/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId__/)
   [Build / JDK 17 and Scala 2.13 / 
kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14764/9/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_17_and_Scala_2_13___testWithGroupId___2/)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in runtime tests [kafka]

2024-01-01 Thread via GitHub


divijvaidya merged PR #14764:
URL: https://github.com/apache/kafka/pull/14764


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16073) Tiered Storage Bug: Incorrect Handling of Offset Ranges During Segment Deletion

2024-01-01 Thread hzh0425 (Jira)
hzh0425 created KAFKA-16073:
---

 Summary: Tiered Storage Bug: Incorrect Handling of Offset Ranges 
During Segment Deletion
 Key: KAFKA-16073
 URL: https://issues.apache.org/jira/browse/KAFKA-16073
 Project: Kafka
  Issue Type: Bug
  Components: core, Tiered-Storage
Affects Versions: 3.6.1
Reporter: hzh0425
Assignee: hzh0425
 Fix For: 3.6.1


This bug pertains to Apache Kafka's tiered storage functionality. Specifically, 
it involves a timing issue in the {{UnifiedLog.deleteSegments}} method. The 
method first deletes segments from memory but delays updating the 
{{{}localLogStartOffset{}}}. Meanwhile, in 
{{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is less 
than {{{}localLogStartOffset{}}}, it triggers the read remote process. However, 
if it's greater, an {{OffsetOutOfRangeException}} is sent to the client.

Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < 
offset3{}}}. A client requests {{offset2}} while a background thread is 
deleting segments. The segments are deleted in memory, but 
{{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to 
{{{}offset3{}}}. In this state, since {{offset2}} is greater than 
{{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously 
returns an {{OffsetOutOfRangeException}} to the client. This happens because 
the system has not yet recognized the new starting offset ({{{}offset3{}}}), 
leading to incorrect handling of fetch requests.
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-01 Thread hzh0425 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hzh0425 updated KAFKA-16073:

Summary: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
localLogStartOffset Update During Segment Deletion  (was: Tiered Storage Bug: 
Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
localLogStartOffset Update During Segment Deletion)

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> This bug pertains to Apache Kafka's tiered storage functionality. 
> Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} 
> method. The method first deletes segments from memory but delays updating the 
> {{{}localLogStartOffset{}}}. Meanwhile, in 
> {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is 
> less than {{{}localLogStartOffset{}}}, it triggers the read remote process. 
> However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the 
> client.
> Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < 
> offset3{}}}. A client requests {{offset2}} while a background thread is 
> deleting segments. The segments are deleted in memory, but 
> {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to 
> {{{}offset3{}}}. In this state, since {{offset2}} is greater than 
> {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously 
> returns an {{OffsetOutOfRangeException}} to the client. This happens because 
> the system has not yet recognized the new starting offset ({{{}offset3{}}}), 
> leading to incorrect handling of fetch requests.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16073) Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-01 Thread hzh0425 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hzh0425 updated KAFKA-16073:

Summary: Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error 
Due to Delayed localLogStartOffset Update During Segment Deletion  (was: Tiered 
Storage Bug: Incorrect Handling of Offset Ranges During Segment Deletion)

> Tiered Storage Bug: Kafka Tiered Storage Bug: Consumer Fetch Error Due to 
> Delayed localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> This bug pertains to Apache Kafka's tiered storage functionality. 
> Specifically, it involves a timing issue in the {{UnifiedLog.deleteSegments}} 
> method. The method first deletes segments from memory but delays updating the 
> {{{}localLogStartOffset{}}}. Meanwhile, in 
> {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is 
> less than {{{}localLogStartOffset{}}}, it triggers the read remote process. 
> However, if it's greater, an {{OffsetOutOfRangeException}} is sent to the 
> client.
> Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < 
> offset3{}}}. A client requests {{offset2}} while a background thread is 
> deleting segments. The segments are deleted in memory, but 
> {{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to 
> {{{}offset3{}}}. In this state, since {{offset2}} is greater than 
> {{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously 
> returns an {{OffsetOutOfRangeException}} to the client. This happens because 
> the system has not yet recognized the new starting offset ({{{}offset3{}}}), 
> leading to incorrect handling of fetch requests.
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-01 Thread hzh0425 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hzh0425 updated KAFKA-16073:

Description: 
The identified bug in Apache Kafka's tiered storage feature involves a delayed 
update of {{localLogStartOffset}} in the {{UnifiedLog.deleteSegments}} method, 
impacting consumer fetch operations. When segments are deleted from the log's 
memory state, the {{localLogStartOffset}} isn't promptly updated. Concurrently, 
{{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
erroneously sends an {{OffsetOutOfRangeException}} to the consumer.

In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
background deletion process removes segments from memory, it hasn't yet updated 
the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. Consequently, 
when the fetch offset ({{{}offset2{}}}) is evaluated against the stale 
{{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, it 
incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue arises 
from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
incorrect handling of consumer fetch requests and potential data access errors.

  was:
This bug pertains to Apache Kafka's tiered storage functionality. Specifically, 
it involves a timing issue in the {{UnifiedLog.deleteSegments}} method. The 
method first deletes segments from memory but delays updating the 
{{{}localLogStartOffset{}}}. Meanwhile, in 
{{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, if the fetch offset is less 
than {{{}localLogStartOffset{}}}, it triggers the read remote process. However, 
if it's greater, an {{OffsetOutOfRangeException}} is sent to the client.

Consider a scenario with concurrent operations, where {{{}offset1 < offset2 < 
offset3{}}}. A client requests {{offset2}} while a background thread is 
deleting segments. The segments are deleted in memory, but 
{{LocalLogStartOffset}} is still at {{offset1}} and not yet updated to 
{{{}offset3{}}}. In this state, since {{offset2}} is greater than 
{{{}offset1{}}}, {{ReplicaManager.handleOffsetOutOfRangeError}} erroneously 
returns an {{OffsetOutOfRangeException}} to the client. This happens because 
the system has not yet recognized the new starting offset ({{{}offset3{}}}), 
leading to incorrect handling of fetch requests.
 
 
 
 


> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14517:Implement regex subscriptions [kafka]

2024-01-01 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1873362395

   @
   
   > @JimmyWang6 , I think for this part
   > 
   > > What if a new topic gets created which matches the regex subscription of 
some member.
   > 
   > we need to use the metadata update mechanism defined by 
`metadata.max.age.ms`. This is the interval using which the consumer will keep 
refreshing the metadata and do the pattern matching periodically against all 
topics existing at the time of check. Having said that, this looks like a 
client level config so not sure if it needs to be part of this PR. @dajac , 
WDYT?
   
   @dajac @vamossagar12 
   Thank you for your comments!
I have added more unit tests and addressed the issues mentioned in the 
previous comments. In the case of a newly created topic that matches the regex 
subscription of certain members, I believe the method public void 
**onNewMetadataImage(MetadataImage newImage, MetadataDelta delta)** will be 
invoked. Therefore, I have made modifications to the groupsSubscribedToTopic 
method to identify the consumer groups that have subscribed to topics matching 
the regular expression and requested a metadata refresh for those groups. 
   
   The latest code appears to be functioning well. Do you have any further 
comments or suggestions regarding this approach?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Arpit Goyal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arpit Goyal updated KAFKA-16063:

Attachment: Screenshot 2024-01-01 at 10.51.03 PM-1.png

> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Arpit Goyal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arpit Goyal updated KAFKA-16063:

Attachment: Screenshot 2024-01-01 at 10.51.03 PM.png

> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801568#comment-17801568
 ] 

Arpit Goyal commented on KAFKA-16063:
-

[~divijvaidya] I am able to reproduce the issue locally and also figured out 
the reason of the leak. 

While initialize  the directory Service in MiniKDC.scala 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala#L181

DirectoryService looks for shutdownHookEnabled  flag which is enabled by 
default and  adds ApplicationShutdownHooks into the Identity HashMap.

{code:java}
 if ( shutdownHookEnabled )
{
Runtime.getRuntime().addShutdownHook( new Thread( new Runnable()
{
public void run()
{
try
{
shutdown();
}
catch ( Exception e )
{
LOG.warn( "Failed to shut down the directory service: "
+ DefaultDirectoryService.this.instanceId, e );
}
}
}, "ApacheDS Shutdown Hook (" + instanceId + ')' ) );

LOG.info( "ApacheDS shutdown hook has been registered with the 
runtime." );
}
{code}
But This Map is never cleared in the  DirectoryServiceShutdown method which we  
called during MiniKDC stop function. 

I think we can disable the  shutdownHookEnabled flag , as directory service 
shutdown method is already being called in the MiniKDC stop function.
{code:java}
ds.setShutdownHookEnabled(false)
{code}
Attached screenshot for the reference.  !Screenshot 2024-01-01 at 10.51.03 
PM.png! 



> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801577#comment-17801577
 ] 

Divij Vaidya commented on KAFKA-16063:
--

Nice find and yes, disabling shutdown hook sounds like a plan.

I am curious though, if the map is not cleared during stop function, then who 
clears it? As an alternative solution, should we instead clear the map in stop 
function? This would ensure that even if we forget to call stop, on process 
exit, kdc will stop definitely due to the hooks.

> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16072) Create Junit 5 extension to detect thread leak

2024-01-01 Thread Dmitry Werner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Werner reassigned KAFKA-16072:
-

Assignee: Dmitry Werner

> Create Junit 5 extension to detect thread leak
> --
>
> Key: KAFKA-16072
> URL: https://issues.apache.org/jira/browse/KAFKA-16072
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Dmitry Werner
>Priority: Major
>  Labels: newbie++
>
> The objective of this task is to create a Junit extension that will execute 
> after every test and verify that there are no lingering threads left over. 
> An example of how to create an extension can be found here:
> [https://github.com/apache/kafka/pull/14783/files#diff-812cfc2780b6fc0e7a1648ff37912ff13aeda4189ea6b0d4d847b831f66e56d1]
> An example on how to find unexpected threads is at 
> [https://github.com/apache/kafka/blob/d5aa341a185f4df23bf587e55bcda4f16fc511f1/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2427]
>  and also at 
> https://issues.apache.org/jira/browse/KAFKA-16052?focusedCommentId=17800978&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17800978
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16072: JUnit 5 extension to detect thread leak [kafka]

2024-01-01 Thread via GitHub


wernerdv opened a new pull request, #15101:
URL: https://github.com/apache/kafka/pull/15101

   Added LeakTestingExtension based on TestUtils#verifyNoUnexpectedThreads for 
verify that there are no lingering threads left over. Extension execute after 
every test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16055:

Component/s: streams

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]

2024-01-01 Thread via GitHub


mjsax opened a new pull request, #15102:
URL: https://github.com/apache/kafka/pull/15102

   We did no complete KIP-714 with regard to collecting producer clients 
instance IDs in Kafka Streams if EOSv1 is enabled. Instead of throwing an 
UnsupportedOperationException, we should return an empty map.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: avoid unnecessary UnsupportedOperationException [kafka]

2024-01-01 Thread via GitHub


mjsax commented on PR #15102:
URL: https://github.com/apache/kafka/pull/15102#issuecomment-1873512205

   We still don't have an RC for 3.7, so it would be great to close this minor 
gap to provide a better user experience. \cc @stanislavkozlovski for visibility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move KafkaConfig to server module [kafka]

2024-01-01 Thread via GitHub


OmniaGM opened a new pull request, #15103:
URL: https://github.com/apache/kafka/pull/15103

   still - WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-01 Thread via GitHub


ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439091103


##
core/src/test/scala/unit/kafka/log/LocalLogTest.scala:
##
@@ -362,8 +364,8 @@ class LocalLogTest {
 }
 assertEquals(5, log.segments.numberOfSegments)
 assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-val expected = log.segments.values.asScala.toVector
-val deleted = log.truncateFullyAndStartAt(10L)
+val expected = new util.ArrayList(log.segments.values)
+val deleted = 
StreamSupport.stream(log.truncateFullyAndStartAt(10L).spliterator(), 
false).collect(Collectors.toList())

Review Comment:
   This can be simplified if `truncateFullyAndStartAt` returns `Collection` 
instead of `Iterable`.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -0,0 +1,1146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.FileLogInputStream;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.Scheduler;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * An append-only log for storing messages locally. The log is a sequence of 
LogSegments, each with a base offset.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe.
+ */
+public class LocalLog {
+
+/**
+ * a file that is scheduled to be deleted
+ */
+public static final String DELETED_FILE_SUFFIX = 
LogFileUtils.DELETED_FILE_SUFFIX;
+
+/**
+ * A temporary file that is being used for log cleaning
+ */
+public static final String CLEANED_FILE_SUFFIX = ".cleaned";
+
+/**
+ * A temporary file used when swapping files into the log
+ */
+public static final String SWAP_FILE_SUFFIX = ".swap";
+
+/**
+ * a directory that is scheduled to be deleted
+ */
+public static final String DELETE_DIR_SUFFIX = "-delete";
+
+/**
+ * a directory that is used for future partition
+ */
+public static final String FUTURE_DIR_SUFFIX = "-future";
+public static final String STRAY_DIR_SUFFIX = "-stray";
+
+public static final Pattern DELETE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + DELETE_DIR_SUFFIX);
+public static final Pattern FUTURE_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + FUTURE_DIR_SUFFIX);
+public static final Pattern STRAY_DIR_PATTERN = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)" + STRAY_DIR_SUFFIX);
+
+public static final long UNKNOWN_OFFSET = -1L;
+
+private final Logger logger;
+
+private final LogSegments segments;
+private final Scheduler scheduler;
+private final Time time;
+private final TopicPartition topicPartition;
+private final LogDirFailureChannel logDirFailureChannel;
+
+// Last time the log was flushed
+private final AtomicLong lastFlushedTime;
+
+private volatile

Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-01 Thread via GitHub


ijuma commented on code in PR #14034:
URL: https://github.com/apache/kafka/pull/14034#discussion_r1439152043


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1441,9 +1442,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*  (if there is one). It returns true iff the segment is 
deletable.
* @return the segments ready to be deleted
*/
-  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
-def isSegmentEligibleForDeletion(nextSegmentOpt: Option[LogSegment], 
upperBoundOffset: Long): Boolean = {
-  val allowDeletionDueToLogStartOffsetIncremented = 
nextSegmentOpt.isDefined && logStartOffset >= nextSegmentOpt.get.baseOffset
+  private[log] def deletableSegments(predicate: (LogSegment, 
Optional[LogSegment]) => Boolean): Iterable[LogSegment] = {

Review Comment:
   We should change this to return `java.util.Collection[LogSegment]` to avoid 
unnecessary conversions.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1511,7 +1512,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 }
 localLog.checkIfMemoryMappedBufferClosed()
 // remove the segments for lookups
-localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
+localLog.removeAndDeleteSegments(segmentsToDelete.toList.asJava,  
true, reason)

Review Comment:
   This conversion can be avoided if we make the change to the method signature 
suggested above.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1564,8 +1565,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
-def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-  nextSegmentOpt.exists(_.baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset))
+def shouldDelete(segment: LogSegment, nextSegmentOpt: 
Optional[LogSegment]): Boolean = {
+  if (nextSegmentOpt.isPresent)
+nextSegmentOpt.get().baseOffset <= (if (remoteLogEnabled()) 
localLogStartOffset() else logStartOffset)
+  else false

Review Comment:
   Nit: you can do `nextSegmentOpt.isPresent && nextSegmentOpt.get...`



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1245,7 +1246,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def collectAbortedTransactions(startOffset: Long, 
upperBoundOffset: Long): List[AbortedTxn] = {

Review Comment:
   We should change this to return `java.util.Collection` or `java.util.List` 
to avoid unnecessary conversions.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2265,11 +2268,12 @@ object UnifiedLog extends Logging {
 def deleteProducerSnapshots(): Unit = {
   LocalLog.maybeHandleIOException(logDirFailureChannel,
 parentDir,
-s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir") {
+s"Error while deleting producer state snapshots for $topicPartition in 
dir $parentDir", {
 snapshotsToDelete.foreach { snapshot =>
   snapshot.deleteIfExists()
 }
-  }
+  return;

Review Comment:
   Hmm, it is a bit odd that a `return` with no value is required for scala 
code. Is this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-01 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801663#comment-17801663
 ] 

Satish Duggana commented on KAFKA-16073:


Thanks [~hzh0425@apache] for filing JIRA with a detailed description. 

I am trying to summarize the scenario that you mentioned earlier in JIRA 
description with an example. Let me know if I am missing anything here. 
Let us assume each segment has one offset in this example.

log start offset0
log end offset  10
local log start offset  4 
fetch offset6 
new local log start offset  7

Deletion based on retention configs is started and eventually updating the 
local log start offset as 7.

There is a race condition here where the segments list is updated by removing 
4, 5, and 6 offset segments in LocalLog and then updates the 
local-log-start-offset. But fetch offset is being served concurrently and it 
may throw OffsetOutOfRangeException if the inmemory segments are already 
removed in LocalLog and local-log-start-offset is not yet updated as 7 when it 
executes the 
[code|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1866]
 as it fails the condition because fetch offset(6) < old 
local-log-start-offset(4).


> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16073) Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed localLogStartOffset Update During Segment Deletion

2024-01-01 Thread hzh0425 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801667#comment-17801667
 ] 

hzh0425 commented on KAFKA-16073:
-

[~satish.duggana] Yes, it's exactly what you described. I actually encountered 
this case in a production environment
Do you have any ideas for repair? For example, when fetching 
localLogStartOffset, a lock should be added

> Kafka Tiered Storage Bug: Consumer Fetch Error Due to Delayed 
> localLogStartOffset Update During Segment Deletion
> 
>
> Key: KAFKA-16073
> URL: https://issues.apache.org/jira/browse/KAFKA-16073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: hzh0425
>Assignee: hzh0425
>Priority: Major
>  Labels: KIP-405, kip-405, tiered-storage
> Fix For: 3.6.1
>
>
> The identified bug in Apache Kafka's tiered storage feature involves a 
> delayed update of {{localLogStartOffset}} in the 
> {{UnifiedLog.deleteSegments}} method, impacting consumer fetch operations. 
> When segments are deleted from the log's memory state, the 
> {{localLogStartOffset}} isn't promptly updated. Concurrently, 
> {{ReplicaManager.handleOffsetOutOfRangeError}} checks if a consumer's fetch 
> offset is less than the {{{}localLogStartOffset{}}}. If it's greater, Kafka 
> erroneously sends an {{OffsetOutOfRangeException}} to the consumer.
> In a specific concurrent scenario, imagine sequential offsets: {{{}offset1 < 
> offset2 < offset3{}}}. A client requests data at {{{}offset2{}}}. While a 
> background deletion process removes segments from memory, it hasn't yet 
> updated the {{LocalLogStartOffset}} from {{offset1}} to {{{}offset3{}}}. 
> Consequently, when the fetch offset ({{{}offset2{}}}) is evaluated against 
> the stale {{offset1}} in {{{}ReplicaManager.handleOffsetOutOfRangeError{}}}, 
> it incorrectly triggers an {{{}OffsetOutOfRangeException{}}}. This issue 
> arises from the out-of-sync update of {{{}localLogStartOffset{}}}, leading to 
> incorrect handling of consumer fetch requests and potential data access 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801668#comment-17801668
 ] 

Arpit Goyal commented on KAFKA-16063:
-

[~divijvaidya] It seems to be a well known leak over the internet. 
https://blog.creekorful.org/2020/03/classloader-and-memory-leaks/
https://stackoverflow.com/questions/6385018/memory-leaks-with-addshutdownhook

The map will get cleared only when JVM shutdown happen,  during which this hook 
registered would get executed and cleared. 

{code:java}
 static void runHooks() {
Collection threads;
synchronized(ApplicationShutdownHooks.class) {
threads = hooks.keySet();
hooks = null;
}

for (Thread hook : threads) {
hook.start();
}
for (Thread hook : threads) {
while (true) {
try {
hook.join();
break;
} catch (InterruptedException ignored) {
}
}
}
}
{code}

We does not have the thread reference to  clear it manually. removeShutdownHook 
require thread reference which is anonymously created within the 
DefaultDirectoryService library code. 

{code:java}
Runtime.getRuntime.removeShutdownHook(THREAD)
{code}
The only feasible option as far the analysis is to disable the hook enabled 
flag. 


> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16050) consumer was removed from group,but still can poll data from kafka, data duplicate

2024-01-01 Thread Xin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801087#comment-17801087
 ] 

Xin edited comment on KAFKA-16050 at 1/2/24 6:45 AM:
-

Finally I found the reason

Consumer Rejoin operation was delay for a long time, and another consumer has 
been assigned the partitions

 
 
Can we optimize these process?


was (Author: auroraxlh):
Finally Ifound the reason

Consumer Rejoin operation was delay for a long time, and another consumer has 
been assigned the partitions

 
 
Can we optimize these process?

> consumer was removed from group,but still can  poll data from kafka, data 
> duplicate
> ---
>
> Key: KAFKA-16050
> URL: https://issues.apache.org/jira/browse/KAFKA-16050
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.1
>Reporter: Xin
>Priority: Major
>
> I have 3 brokers: b1,b2,b3
> a topic :  test, partitiion5,replication3
> 3 consumer in 1 group:  consumer1,consumer2,consumer3
> groupid: xx
>  
> consumer1 running in b1
> consumer2 running in b2
> consumer3 running in b3
> ./kafka-console-consumer.sh --bootstrap-server localhost:9093 --group xx  
> --topic test   --from-beginning
>  
> b2's  clock changed, consumer2 was removed from group xx(reason: removing 
> member consumer-2 on heartbeat expiration) 
> (kafka.coordinator.group.GroupCoordinator)
> kafka-consumer-groups.sh can't see any record about cosumer2
> ./kafka-consumer-groups.sh  --bootstrap-server localhost:9093 --all-topics 
> --describe --all-groups
>  
> Then consumer rebalanced,  partiitons assigned to  consumer2 was assigned to 
> other consumer 
> Although consumer2 was removed from group xx ,BUT still poll data from kafka 
> ,kafka can't find it
> After rebalance  another consumer poll the same partition with consumer2 
> This make data was poll duplicate
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14133: Migrate stateManager mock in StoreChangelogReaderTest to Mockito [kafka]

2024-01-01 Thread via GitHub


clolov commented on code in PR #14929:
URL: https://github.com/apache/kafka/pull/14929#discussion_r1439203605


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -946,7 +966,6 @@ public void 
shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
 final Map mockTasks = mock(Map.class);
 
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
 
EasyMock.expect(mockTasks.containsKey(null)).andReturn(true).anyTimes();
-EasyMock.expect(stateManager.taskType()).andReturn(type);

Review Comment:
   This setup, and the two below, was reported as unnecessary by Mockito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16063: Prevent memory leak by Disabling shutdownhook [kafka]

2024-01-01 Thread via GitHub


iit2009060 opened a new pull request, #15104:
URL: https://github.com/apache/kafka/pull/15104

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16063) Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests

2024-01-01 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801687#comment-17801687
 ] 

Arpit Goyal commented on KAFKA-16063:
-

[~divijvaidya] [~showuon] PR for review.
https://github.com/apache/kafka/pull/15104

> Fix leaked ApplicationShutdownHooks in EndToEndAuthorizationTests
> -
>
> Key: KAFKA-16063
> URL: https://issues.apache.org/jira/browse/KAFKA-16063
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Arpit Goyal
>Priority: Major
> Attachments: Screenshot 2023-12-29 at 12.38.29.png, Screenshot 
> 2023-12-31 at 7.19.25 AM.png, Screenshot 2024-01-01 at 10.51.03 PM-1.png, 
> Screenshot 2024-01-01 at 10.51.03 PM.png
>
>
> All test extending `EndToEndAuthorizationTest` are leaking 
> DefaultDirectoryService objects.
> This can be observed using the heap dump at 
> [https://www.dropbox.com/scl/fi/4jaq8rowkmijaoj7ec1nm/GradleWorkerMain_10311_27_12_2023_13_37_08_Leak_Suspects.zip?rlkey=minkbvopb0c65m5wryqw234xb&dl=0]
>  (unzip this and you will find a hprof which can be opened with your 
> favourite heap analyzer)
> The stack trace looks like this:
> !Screenshot 2023-12-29 at 12.38.29.png!
>  
> I suspect that the reason is because DefaultDirectoryService#startup() 
> registers a shutdownhook which is somehow messed up by 
> QuorumTestHarness#teardown().
> We need to investigate why this is leaking and fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)