Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
current-log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > old-HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-07 Thread Lenin Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844529#comment-17844529
 ] 

Lenin Joseph edited comment on KAFKA-16656 at 5/8/24 6:47 AM:
--

Hi [~ChrisEgerton] , thanks for looking into it.
We see cyclic replication for all the topics including internal topics such as 
mm2-offset-syncs, checkpoint, mirrormaker2-cluster-status,  
mirrormaker2-cluster-configs, and mirrormaker2-cluster-offsets. But not for 
consumer-offsets topic.

Below are the configs that we have used.
topicsPattern: ".*"

replication.policy.separator: "-"

replication.policy.class: 
org.apache.kafka.connect.mirror.DefaultReplicationPolicy


was (Author: JIRAUSER304387):
Hi [~ChrisEgerton] , thanks for looking into it.
We see cyclic replication for all the topics including internal topics such as 
mm2-offset-syncs, checkpoint, mirrormaker2-cluster-status,  
mirrormaker2-cluster-configs, and mirrormaker2-cluster-offsets. But not for 
consumer-offsets topic.

Below are the configs that we have used.
topicsPattern: ".*"

replication.policy.separator: "-"

replication.policy.class: 
org.apache.kafka.connect.mirror.IdentityReplicationPolicy

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593471183


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   Should we avoid throwing the error and return message-only metadata when the 
offset is lesser than the log-start-offset?
   
   While updating the 
[UnifiedLog#updateLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L535),
 the HWM also gets updated and it doesn't hit the `fetchHighWatermarkMetadata` 
(or) `convertToOffsetMetadataOrThrow` so the call will succeed even when 
log-start-offset > HWM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-07 Thread via GitHub


KevinZTW commented on code in PR #15884:
URL: https://github.com/apache/kafka/pull/15884#discussion_r1593470507


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -160,28 +154,26 @@ private String outputWithoutEpoch(String output) {
 int pos = output.indexOf("Epoch: ");
 return (pos > 0) ? output.substring(0, pos) : output;
 }
-}
 
-class FeatureCommandUnitTest {
 @Test
 public void testLevelToString() {
 assertEquals("5", FeatureCommand.levelToString("foo.bar", (short) 5));
 assertEquals("3.3-IV0",
-FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
+FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
 }
 
 @Test
 public void testMetadataVersionsToString() {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
-
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
+
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
 }
 
 @Test
 public void testdowngradeType() {

Review Comment:
   thanks! just fix it in new commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2099844734

   > Thanks for the PR. One question: So when we temporarily set high-watermark 
as ` LogOffsetMetadata(0)` for the leader, we're waiting for the high-watermark 
gets updated after followers fetch from the leader, right?
   
   yes, the call to 
[maybeIncrementLeaderHW](https://sourcegraph.com/github.com/apache/kafka@5f933ac840343429e911f5706ccb7cd8dc379462/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L1166)
 will succeed when the node becomes leader for the partition. Note that if the 
current node is the only alive replica, then the high-watermark gets updated to 
the leader-log-end-offset. The behavior is same for both normal and 
remote-storage enabled topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-07 Thread via GitHub


jeqo opened a new pull request, #15893:
URL: https://github.com/apache/kafka/pull/15893

   [KAFKA-16691]
   
   ---
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1593390067


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   The test is still flaky. Investigating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16656) Using a custom replication.policy.separator with DefaultReplicationPolicy

2024-05-07 Thread Lenin Joseph (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844529#comment-17844529
 ] 

Lenin Joseph commented on KAFKA-16656:
--

Hi [~ChrisEgerton] , thanks for looking into it.
We see cyclic replication for all the topics including internal topics such as 
mm2-offset-syncs, checkpoint, mirrormaker2-cluster-status,  
mirrormaker2-cluster-configs, and mirrormaker2-cluster-offsets. But not for 
consumer-offsets topic.

Below are the configs that we have used.
topicsPattern: ".*"

replication.policy.separator: "-"

replication.policy.class: 
org.apache.kafka.connect.mirror.IdentityReplicationPolicy

> Using a custom replication.policy.separator with DefaultReplicationPolicy
> -
>
> Key: KAFKA-16656
> URL: https://issues.apache.org/jira/browse/KAFKA-16656
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.5.1
>Reporter: Lenin Joseph
>Priority: Major
>
> Hi,
> In the case of bidirectional replication using mm2, when we tried using a 
> custom replication.policy.separator( ex: "-") with DefaultReplicationPolicy , 
> we see cyclic replication of topics. Could you confirm whether it's mandatory 
> to use a CustomReplicationPolicy whenever we want to use a separator other 
> than a "." ?
> Regards, 
> Lenin



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [DON'T SEE THIS PR] debug for thread leak [kafka]

2024-05-07 Thread via GitHub


chia7712 opened a new pull request, #15891:
URL: https://github.com/apache/kafka/pull/15891

   thread leak where are you? 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Correct connector scheduled rebalance logs [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15875:
URL: https://github.com/apache/kafka/pull/15875#issuecomment-2099722116

   @yuz10 thanks for your contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Correct connector scheduled rebalance logs [kafka]

2024-05-07 Thread via GitHub


chia7712 merged PR #15875:
URL: https://github.com/apache/kafka/pull/15875


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16678) Remove unimplementedquorum from EndToEndAuthorizationTest

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16678.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove unimplementedquorum from EndToEndAuthorizationTest
> -
>
> Key: KAFKA-16678
> URL: https://issues.apache.org/jira/browse/KAFKA-16678
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.8.0
>
>
> `unimplementedquorum`[0] is used to skip test cases if they don't support to 
> run by kraft. However, KAFKA-15219 , KAFKA-14765 and KAFKA-14776 make related 
> tests support to run by kraft.
> In short, it is time to remove the unused variable :)
> [0] 
> [https://github.com/apache/kafka/blob/d76352e2151178521dc447e3406dabb8fcd4c57c/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L146]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16678: Remove variable "unimplementedquorum" [kafka]

2024-05-07 Thread via GitHub


chia7712 merged PR #15879:
URL: https://github.com/apache/kafka/pull/15879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16679) Merge unit test down to the class of integration test

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16679:
---
Summary: Merge unit test down to the class of integration test  (was: Merge 
`DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, 
`FeatureCommandUnitTest` into FeatureCommandTest)

> Merge unit test down to the class of integration test
> -
>
> Key: KAFKA-16679
> URL: https://issues.apache.org/jira/browse/KAFKA-16679
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
>
> Normally, we don't put multi test classes into single file. Those test 
> classes can be extracted into a new class file. Or we can merge them into 
> single class by using "@Test" annotation. That can make those test cases run 
> without embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16679) Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into FeatureCommandTest

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16679:
---
Summary: Merge `DeleteRecordsCommandUnitTest` into 
`DeleteRecordsCommandTest`, `FeatureCommandUnitTest` into FeatureCommandTest  
(was: Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, 
`FeatureCommandUnitTest` into )

> Merge `DeleteRecordsCommandUnitTest` into `DeleteRecordsCommandTest`, 
> `FeatureCommandUnitTest` into FeatureCommandTest
> --
>
> Key: KAFKA-16679
> URL: https://issues.apache.org/jira/browse/KAFKA-16679
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Cheng-Kai, Zhang
>Priority: Minor
>
> Normally, we don't put multi test classes into single file. Those test 
> classes can be extracted into a new class file. Or we can merge them into 
> single class by using "@Test" annotation. That can make those test cases run 
> without embedded cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: merge unit test down to the class of integration test [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15884:
URL: https://github.com/apache/kafka/pull/15884#discussion_r1593365819


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -160,28 +154,26 @@ private String outputWithoutEpoch(String output) {
 int pos = output.indexOf("Epoch: ");
 return (pos > 0) ? output.substring(0, pos) : output;
 }
-}
 
-class FeatureCommandUnitTest {
 @Test
 public void testLevelToString() {
 assertEquals("5", FeatureCommand.levelToString("foo.bar", (short) 5));
 assertEquals("3.3-IV0",
-FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
+FeatureCommand.levelToString(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV0.featureLevel()));
 }
 
 @Test
 public void testMetadataVersionsToString() {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
-
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
+
FeatureCommand.metadataVersionsToString(MetadataVersion.IBP_3_3_IV0, 
MetadataVersion.IBP_3_3_IV3));
 }
 
 @Test
 public void testdowngradeType() {

Review Comment:
   `testDowngradeType`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593363337


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   `fetchOffset` can be message-only metadata when there is a diverging-epoch. 
If there is a diverged-epoch in the LogReadResults, then it won't enter the 
DelayedFetch. We can remove the check if it is not required. 



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   `checkLogStartOffset` will throw `OffsetOutOfRangeException` if the offset 
is lesser than the `logStartOffset`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15886:
URL: https://github.com/apache/kafka/pull/15886#issuecomment-2099705033

   @gaurav-narula thanks for your fix. Before merging this PR, I want to know 
which `timerTask` gets hanging when shutdown. Do you have any idea?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15821:
URL: https://github.com/apache/kafka/pull/15821#issuecomment-2099700974

   > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I 
think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. 
WDYT? Thank you.
   
   That is addressed already. see 
   
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15715:
URL: https://github.com/apache/kafka/pull/15715#issuecomment-2099699004

   build get failed ... retrigger again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2099697982

   @m1a2st Could you please use `ConsumerGroupCommandTestUtils#generator` to 
rewrite this test? thanks!
   
   
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L50


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: Add tags support in ClusterTestExtension [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1593356187


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -53,14 +52,15 @@ public class ClusterConfig {
 private final Map adminClientProperties;
 private final Map saslServerProperties;
 private final Map saslClientProperties;
+private final ArrayList tags;

Review Comment:
   Please use interface `List> instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16668: Add tags support in ClusterTestExtension [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15861:
URL: https://github.com/apache/kafka/pull/15861#discussion_r1593355835


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -153,15 +148,19 @@ public Map> 
perBrokerOverrideProperties() {
 return perBrokerOverrideProperties;
 }
 
-public Map nameTags() {
-Map tags = new LinkedHashMap<>(4);
-name().ifPresent(name -> tags.put("Name", name));
-tags.put("MetadataVersion", metadataVersion.toString());
-tags.put("Security", securityProtocol.name());
-listenerName().ifPresent(listener -> tags.put("Listener", listener));
+public String[] tags() {
 return tags;
 }
 
+public Map nameTags() {

Review Comment:
   My point was `Map nameTags` should be changed to 
`Set nameTags` (or `displayTags`). Also, it should include the "tags". 
For example:
   
   ```java
   public Set nameTags() {
   Set nameTags = new LinkedHashSet<>(tags);
   nameTags.add("MetadataVersion=" + metadataVersion.toString());
   nameTags.add("Security=" + securityProtocol.name());
   listenerName().ifPresent(listener -> nameTags.add("Listener=" + 
listener));
   return nameTags;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use classic consumer with ZK mode for DeleteOffsetsConsumerGroupCommandIntegrationTest [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15872:
URL: https://github.com/apache/kafka/pull/15872#discussion_r1593354055


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -57,28 +56,47 @@
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 @Tag("integration")
-@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
-@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
-})
 @ExtendWith(ClusterTestExtensions.class)
 public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
 public static final String TOPIC_PREFIX = "foo.";
 public static final String GROUP_PREFIX = "test.group.";
 private final ClusterInstance clusterInstance;
 
-private final Iterable> consumerConfigs;
-
 DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
 this.clusterInstance = clusterInstance;
-this.consumerConfigs = clusterInstance.isKRaftTest()
-? 
Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name()),
-Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()))
-: Collections.singletonList(Collections.emptyMap());
 }
 
-@ClusterTest
+private static void generator(ClusterGenerator clusterGenerator) {
+Map serverProperties = new HashMap<>();

Review Comment:
   Please use `ConsumerGroupCommandTestUtils#generator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16689:
--

Assignee: TaiJuWu  (was: Chia-Ping Tsai)

> Move LogValidatorTest to storage module
> ---
>
> Key: KAFKA-16689
> URL: https://issues.apache.org/jira/browse/KAFKA-16689
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>
> `LogValidator` is moved to storage module already but its unit test is still 
> in core module. That is a bit weird. We ought to rewrite it by java and then 
> move it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844513#comment-17844513
 ] 

Chia-Ping Tsai commented on KAFKA-16689:


hi [~ijuma] thanks for your reminder.  

{quote}
we can create subtasks for specific tests under that if we want to break it 
down.
{quote}

https://issues.apache.org/jira/browse/KAFKA-14488 is a subtask already, so I 
add the link to it.

> Move LogValidatorTest to storage module
> ---
>
> Key: KAFKA-16689
> URL: https://issues.apache.org/jira/browse/KAFKA-16689
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `LogValidator` is moved to storage module already but its unit test is still 
> in core module. That is a bit weird. We ought to rewrite it by java and then 
> move it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-05-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16684:
--

Assignee: Johnny Hsu  (was: Chia-Ping Tsai)

> FetchResponse#responseData could return incorrect data
> --
>
> Key: KAFKA-16684
> URL: https://issues.apache.org/jira/browse/KAFKA-16684
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Johnny Hsu
>Priority: Minor
>
> [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
>  make it accept input to return "partial" data. The content of output is 
> based on the input but we cache the output ... It will return same output 
> even though we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2099682338

   @gharris1727   new pr :   https://github.com/apache/kafka/pull/15890 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16643 Fix chaos modifier [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang opened a new pull request, #15890:
URL: https://github.com/apache/kafka/pull/15890

   @gharris1727 https://github.com/apache/kafka/pull/15812
   I modified all the non-standard code according to the rules of 
ModifierOrder, and I divided it into multiple commits by module for better code 
review
   Additional,I modified a scala file to follow the java specification.I don't 
know if that's right
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add MetadataType metric from KIP-866 #15299 [kafka]

2024-05-07 Thread via GitHub


github-actions[bot] commented on PR #15306:
URL: https://github.com/apache/kafka/pull/15306#issuecomment-2099674185

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16218: Make partition reassignment completed as soon as adding replicas are in-sync [kafka]

2024-05-07 Thread via GitHub


github-actions[bot] commented on PR #15332:
URL: https://github.com/apache/kafka/pull/15332#issuecomment-2099674143

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [No review] Repro kafka-16217 [kafka]

2024-05-07 Thread via GitHub


github-actions[bot] commented on PR #15336:
URL: https://github.com/apache/kafka/pull/15336#issuecomment-2099674110

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Global state store restore custom processor [kafka]

2024-05-07 Thread via GitHub


github-actions[bot] commented on PR #15337:
URL: https://github.com/apache/kafka/pull/15337#issuecomment-2099674079

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844508#comment-17844508
 ] 

Ismael Juma commented on KAFKA-16689:
-

[~chia7712] This is intentional - there is a separate Jira for moving the 
tests. The reason is that we want to minimize the chance of breaking things 
during the Java rewrite. So, we want to do the test changes separately from the 
relevant non test changes. The Jira is 
https://issues.apache.org/jira/browse/KAFKA-14488 - we can create subtasks for 
specific tests under that if we want to break it down.

> Move LogValidatorTest to storage module
> ---
>
> Key: KAFKA-16689
> URL: https://issues.apache.org/jira/browse/KAFKA-16689
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `LogValidator` is moved to storage module already but its unit test is still 
> in core module. That is a bit weird. We ought to rewrite it by java and then 
> move it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1593331212


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException

Review Comment:
   For case 1, it looks like we never throw `OffsetOutOfRangeException` now, 
doesn't it?



##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -92,7 +92,10 @@ class DelayedFetch(
 // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
 // which would incorrectly be seen as an instance of Case F.
 if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
+  if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {

Review Comment:
   I can understand the `endOffset.messageOffsetOnly()` case since the leader's 
high watermark is still not updated. But when will `fetchOffset` be 
`messageOffsetOnly`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-05-07 Thread dujian (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844506#comment-17844506
 ] 

dujian commented on KAFKA-16584:


hello [~mjsax] 

very thanks you willing help me create KIP, but it’s  my first creat KIP, I am 
very worried that there will be many modifications and updates in the future, 
I'm worried that multiple revisions will affect you,  Therefore no KIP doc is 
provided。

 

now , the assignee of ‘https://issues.apache.org/jira/browse/INFRA-25451’ can 
create  wiki id for me, but must the PMC

 of project(Kafka) send email or  comment the issues, are you know how to 
connect the PMC。 

my  email is ‘dujian0...@gmail.com’

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread TaiJuWu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844505#comment-17844505
 ] 

TaiJuWu commented on KAFKA-16689:
-

Hi [~chia7712] , I am interested in this topic, could you assign it to me?

> Move LogValidatorTest to storage module
> ---
>
> Key: KAFKA-16689
> URL: https://issues.apache.org/jira/browse/KAFKA-16689
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `LogValidator` is moved to storage module already but its unit test is still 
> in core module. That is a bit weird. We ought to rewrite it by java and then 
> move it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-05-07 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844496#comment-17844496
 ] 

Johnny Hsu commented on KAFKA-16684:


hi [~chia7712] 
May i know if you are working on this? if not I am happy to help :) 

> FetchResponse#responseData could return incorrect data
> --
>
> Key: KAFKA-16684
> URL: https://issues.apache.org/jira/browse/KAFKA-16684
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
>  make it accept input to return "partial" data. The content of output is 
> based on the input but we cache the output ... It will return same output 
> even though we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-07 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1593310007


##
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##
@@ -106,44 +106,55 @@ public String toString() {
 
 public abstract double combine(List samples, MetricConfig config, 
long now);
 
-/* Timeout any windows that have expired in the absence of any events */
-protected void purgeObsoleteSamples(MetricConfig config, long now) {
+/**
+ * Purges any windows that started before the configured period.
+ * Returns the end of the latest purged window.
+ */
+protected long purgeObsoleteSamples(MetricConfig config, long now) {
 long expireAge = config.samples() * config.timeWindowMs();
+long purgedUpToMs = 0;
 for (Sample sample : samples) {
-if (now - sample.lastWindowMs >= expireAge)
+if (now - sample.startTimeMs >= expireAge) {
+purgedUpToMs = Math.max(purgedUpToMs, 
sample.endTimeMs(config));
 sample.reset(now);
+}
 }
+return purgedUpToMs;
 }
 
 protected static class Sample {
 public double initialValue;
 public long eventCount;
-public long lastWindowMs;
+public long startTimeMs;

Review Comment:
   It always played a role of start time. Just renaming, no functional changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix rate metric spikes [kafka]

2024-05-07 Thread via GitHub


emitskevich-blp commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1593308659


##
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##
@@ -64,4 +67,31 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
 double expectedRatePerSec = sampleValue / windowSize;
 assertEquals(expectedRatePerSec, observedRate, EPS);
 }
+
+
+// Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+// window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+// measurement. Start assertions from the second window.
+@Test
+public void testRateIsConsistentAfterTheFirstWindow() {
+MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+List steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 
100, 100, 100, 100);
+
+// start the first window and record events at 0,99,199,...,999 ms 
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+}
+
+// making a gap of 100 ms between windows
+time.sleep(101);
+
+// start the second window and record events at 0,99,199,...,999 ms
+for (int stepMs : steps) {
+time.sleep(stepMs);
+rate.record(config, 1, time.milliseconds());
+double observedRate = rate.measure(config, time.milliseconds());
+assertTrue(observedRate >= 10 && observedRate <= 11);

Review Comment:
   The last measurement here was affected by `Rate` bug and showed ~5 instead 
of ~10. Now it's consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix Rate window size [kafka]

2024-05-07 Thread via GitHub


emitskevich-blp opened a new pull request, #15889:
URL: https://github.com/apache/kafka/pull/15889

   `Rate` metric reports value in the form of `sumOrCount`/`timePeriod`. It has 
a bug in `timePeriod` calculation, which leads to spikes in result values (see 
real example: `io-wait-ratio` metric has ~100% -> ~50% spikes):
   https://github.com/apache/kafka/assets/135832807/24800c04-b754-4c60-891d-5174823890dd";>
   
   `timePeriod` consists of several time windows (see 
[SampledStat](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java)
 for ref). Time window ends after a fixed time following the first event. New 
window starts by the next event after prev window end. There is always some gap 
between windows, while no event is reported. 
   
   Before reporting value, `Rate` purges (`sumOrCount := 0`) all windows, 
started before configured interval. However, it’s only reflected in numerator. 
Denominator is always the whole interval.
   
   Consider the picture, corresponding to default config (`numWindows = 2`):
   https://github.com/apache/kafka/assets/135832807/b5d2bebb-b158-4489-a490-0910c5c460f4";>
   
   
   When report is requested (`measure()`) at the end of right window, the left 
window is purged for numerator, but not for denominator. It is the reason for 
spikes on the chart above: when we move to such zone, numerator is reduced by 2 
times, while denominator remains the same.
   
   This PR fixes such behavior. Now, if some window is purged for numerator, it 
reduces denominator as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16691) Support for nested structures: TimestampConverter

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-16691:


Assignee: Jorge Esteban Quilcate Otoya

> Support for nested structures: TimestampConverter
> -
>
> Key: KAFKA-16691
> URL: https://issues.apache.org/jira/browse/KAFKA-16691
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16690) Support for nested structures: HeaderFrom

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya reassigned KAFKA-16690:


Assignee: Jorge Esteban Quilcate Otoya

> Support for nested structures: HeaderFrom
> -
>
> Key: KAFKA-16690
> URL: https://issues.apache.org/jira/browse/KAFKA-16690
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16691) Support for nested structures: TimestampConverter

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16691:


 Summary: Support for nested structures: TimestampConverter
 Key: KAFKA-16691
 URL: https://issues.apache.org/jira/browse/KAFKA-16691
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jorge Esteban Quilcate Otoya






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16690) Support for nested structures: HeaderFrom

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-16690:


 Summary: Support for nested structures: HeaderFrom
 Key: KAFKA-16690
 URL: https://issues.apache.org/jira/browse/KAFKA-16690
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jorge Esteban Quilcate Otoya






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14226) Introduce support for nested structures

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-14226:
-
Description: 
Abstraction for FieldPath and initial SMTs:
 * ExtractField

  was:
Abstraction for FieldPath and initial SMTs:
 * ExtractField
 * HeaderFrom
 * TimestampConverter


> Introduce support for nested structures
> ---
>
> Key: KAFKA-14226
> URL: https://issues.apache.org/jira/browse/KAFKA-14226
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Abstraction for FieldPath and initial SMTs:
>  * ExtractField



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14226) Introduce support for nested structures

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya resolved KAFKA-14226.
--
Resolution: Fixed

Merged: https://github.com/apache/kafka/pull/15379

> Introduce support for nested structures
> ---
>
> Key: KAFKA-14226
> URL: https://issues.apache.org/jira/browse/KAFKA-14226
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Abstraction for FieldPath and initial SMTs:
>  * ExtractField
>  * HeaderFrom
>  * TimestampConverter



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16264) Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration

2024-05-07 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844484#comment-17844484
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-16264:
--

Thanks [~jolshan] ! Finally got some time to look into this one. Please, have a 
look at [https://github.com/apache/kafka/pull/15888]

> Expose `producer.id.expiration.check.interval.ms` as dynamic broker 
> configuration
> -
>
> Key: KAFKA-16264
> URL: https://issues.apache.org/jira/browse/KAFKA-16264
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Dealing with a scenario where too many producer ids lead to issues (e.g. high 
> cpu utilization, see KAFKA-16229) put operators in need to flush producer ids 
> more promptly than usual.
> Currently, only the expiration timeout `producer.id.expiration.ms` is exposed 
> as dynamic config. This is helpful (e.g. by reducing the timeout, less 
> producer would eventually be kept in memory), but not enough if the 
> evaluation frequency is not sufficiently short to flush producer ids before 
> becoming an issue. Only by tuning both, the issue could be workaround.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2099569696

   @gharris1727   i request a jira acount  link   gongxuanzhang   email: 
gongxuanzhangm...@gmail.com 
   I need a confirmation. Can I get it from you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enable test for ensureInternalEndpointIsSecured [kafka]

2024-05-07 Thread via GitHub


FrankYang0529 commented on code in PR #15868:
URL: https://github.com/apache/kafka/pull/15868#discussion_r1593273181


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java:
##
@@ -115,11 +112,11 @@ public void ensureInternalEndpointIsSecured() throws 
Throwable {
 // Try again, but with an invalid signature
 log.info(
 "Making a POST request to the {} endpoint with no connector 
started and an invalid signature header; "
-+ "expecting 403 error response",
++ "expecting 503 error response",
 connectorTasksEndpoint
 );
 assertEquals(
-FORBIDDEN.getStatusCode(),
+SERVICE_UNAVAILABLE.getStatusCode(),

Review Comment:
   CI can't pass. Not sure whether there is some connector forgot to close. The 
error is:
   
   ```
   java.lang.AssertionError: expected:<503> but was:<403>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.junit.Assert.assertEquals(Assert.java:633)
at 
org.apache.kafka.connect.integration.SessionedProtocolIntegrationTest.ensureInternalEndpointIsSecured(SessionedProtocolIntegrationTest.java:118)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2099566070

   > @gongxuanzhang Do you want to merge this PR as-is, or do you want to add 
more fixes to this?
   
   I will create new pr linked 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16689) Move LogValidatorTest to storage module

2024-05-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16689:
--

 Summary: Move LogValidatorTest to storage module
 Key: KAFKA-16689
 URL: https://issues.apache.org/jira/browse/KAFKA-16689
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`LogValidator` is moved to storage module already but its unit test is still in 
core module. That is a bit weird. We ought to rewrite it by java and then move 
it to storage module.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1593260374


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   for another, `LogValidator` is moved to storage module already but its unit 
test is still in core module. That is a bit weird. We can rewrite it by java 
with bug fix and then move it to storage module. I have filed 
https://issues.apache.org/jira/browse/KAFKA-16689



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang closed pull request #15812: a slight change.
URL: https://github.com/apache/kafka/pull/15812


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16264: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration [kafka]

2024-05-07 Thread via GitHub


jeqo opened a new pull request, #15888:
URL: https://github.com/apache/kafka/pull/15888

   Include expiration check interval configuration as dynamic to re-schedule 
producer state checker.
   
   See commits for more details.
   
   [KAFKA-16264]
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gongxuanzhang commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2099540536

   > @gongxuanzhang Do you want to merge this PR as-is, or do you want to add 
more fixes to this?
   
   I choose the latter and do more
   thank you for your trust


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: (KIP-924) New interfaces and stubbed utility classes for pluggable TaskAssignors. [kafka]

2024-05-07 Thread via GitHub


apourchet opened a new pull request, #15887:
URL: https://github.com/apache/kafka/pull/15887

   This is the first PR in a sequence to support custom task assignors in Kafka 
Streams, which was described in KIP 924. It creates and exposes all of the 
interfaces that will need to be implemented during the refactor of the current 
task assignment logic.
   
   This PR does not include actual logic, and so has no tests included beyond 
the typical style and compile checks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1593241011


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   yep, that is a "TYPO" but it does not change the test. We do pass the "NONE" 
to create `LogValidator` so it will run the path `assignOffsetsNonCompressed` 
   
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala#L377
   
   However, I do observe a potential bug. 
   
   **context**
   1. Those batches can have different compression
   2. We take the compression from last batch
   
https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/log/UnifiedLog.scala#L1180
   
   **potential bug**
   
   topic-level compression = GZIP
   batch_0 = NONE
   batch_1 = GZIP
   
   In this case, we don't rebuild records according to topic-level compression 
since the compression of "last batch" is equal to `GZIP`. Hence, it results in 
batch_0 having incorrect compression.
   
   This bug does not produce corrupt records, so we can add comments/docs to 
describe that issue. Or we can fix it by changing the `sourceCompression` to be 
a "collection" of all batches' compression, and then do conversion if one of 
them is mismatched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-07 Thread via GitHub


gaurav-narula commented on PR #15886:
URL: https://github.com/apache/kafka/pull/15886#issuecomment-2099506624

   CC: @soarez @chia7712 
   
   I came across this in the [CI pipeline 
run](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests)
 for #15885 and it might have been the one @chia7712 referred to in #15836 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-07 Thread via GitHub


gaurav-narula opened a new pull request, #15886:
URL: https://github.com/apache/kafka/pull/15886

   We observe some thread leaks in CI which point to the executor service 
thread. This change tries to shutdown the executor service using the helper 
method in `ThreadUtils`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16688) SystemTimer leaks resources on close

2024-05-07 Thread Gaurav Narula (Jira)
Gaurav Narula created KAFKA-16688:
-

 Summary: SystemTimer leaks resources on close
 Key: KAFKA-16688
 URL: https://issues.apache.org/jira/browse/KAFKA-16688
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.8.0
Reporter: Gaurav Narula
Assignee: Gaurav Narula


We observe some thread leaks with thread name {{executor-client-metrics}}.

This may happen because {{SystemTimer}} doesn't attempt to shutdown its 
executor service properly.

Refer: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests
 and tests with {{initializationError}} in them for stacktrace



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Various cleanups in clients tests [kafka]

2024-05-07 Thread via GitHub


lianetm commented on code in PR #15877:
URL: https://github.com/apache/kafka/pull/15877#discussion_r1593116655


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -106,10 +103,9 @@ public void setup() {
 commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
 offsetsRequestManager = testBuilder.offsetsRequestManager;
 coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-memberhipsManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
+HeartbeatRequestManager heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);

Review Comment:
   btw, removing all  those unused might help us remove the suppression 
ClassDataAbstractionCoupling, worth checking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-07 Thread via GitHub


jolshan commented on code in PR #15837:
URL: https://github.com/apache/kafka/pull/15837#discussion_r1593105981


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -3037,14 +3062,71 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion(): Unit = {
+  def 
shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion_allowedWithAlterCluster():
 Unit = {

Review Comment:
   nit: we typically don't use underscores in method names like this. Can we 
stick to camel case here?
   Also can we parameterize this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients tests [kafka]

2024-05-07 Thread via GitHub


lianetm commented on code in PR #15877:
URL: https://github.com/apache/kafka/pull/15877#discussion_r1593098515


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -106,10 +103,9 @@ public void setup() {
 commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
 offsetsRequestManager = testBuilder.offsetsRequestManager;
 coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-memberhipsManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
+HeartbeatRequestManager heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);

Review Comment:
   I don't think they are used to test the existence of the managers here, I 
would say they were just left unused so we should remove them. Managers are 
retrieved in this way in many other tests (ex 
[HeartbeatRequestManagerTest](https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L119)),
 but only when needed.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-07 Thread via GitHub


lianetm commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1591377324


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.
+ */
+class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  assertTrue(consumer.assignment().contains(tp));
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(quorum: 
String, groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val map = consumer.beginningOffsets(Collections.singletonList(tp))
+  assertTrue(map.containsKey(tp))
+  assertEquals(0, map.get(tp))
+}
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsRevoked(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsRevoked { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")

Review Comment:
   ~~ditto~~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16287: Implement example tests for common rebalance callback [kafka]

2024-05-07 Thread via GitHub


lianetm commented on code in PR #15408:
URL: https://github.com/apache/kafka/pull/15408#discussion_r1592977058


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala:
##
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.api.{AbstractConsumerTest, BaseConsumerTest}
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener}
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.util
+import java.util.Arrays.asList
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.stream.Stream
+
+/**
+ * Integration tests for the consumer that cover interaction with the consumer 
from within callbacks
+ * and listeners.
+ */
+class PlaintextConsumerCallbackTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testConsumerRebalanceListenerAssignOnPartitionsAssigned(quorum: String, 
groupProtocol: String): Unit = {
+val tp = new TopicPartition(topic, 0);
+triggerOnPartitionsAssigned { (consumer, _) =>
+  val e: Exception = assertThrows(classOf[IllegalStateException], () => 
consumer.assign(Collections.singletonList(tp)))
+  assertEquals(e.getMessage, "Subscription to topics, partitions and 
pattern are mutually exclusive")

Review Comment:
   nop, just adding a message to the assertThrows so that the failure comes out 
specific to calling assign after subscribe (and then we would end up with a 
similar assertThrows for calling subscribe(Pattern)), but re-thinking it, it's 
just clear enough in case of failure since it would most probably always fail 
in the call to assign/subscribePattern itself. No concerns then. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


ableegoldman commented on PR #15601:
URL: https://github.com/apache/kafka/pull/15601#issuecomment-2099136409

   Thanks Greg. If no one's had time to look at this by next week, we'll assign 
a reviewer during the next Kafka Streams hangout


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] a slight change. [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on PR #15812:
URL: https://github.com/apache/kafka/pull/15812#issuecomment-2099128163

   @gongxuanzhang Do you want to merge this PR as-is, or do you want to add 
more fixes to this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1592924655


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -187,17 +157,25 @@ public void process(final Record record) {
 
context().forward(record.withValue(joiner.apply(record.key(), record.value(), 
null)));
 } else {
 sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
-outerJoinStore.ifPresent(store -> store.put(
-TimestampedKeyAndJoinSide.make(isLeftSide, 
record.key(), inputRecordTimestamp),
-LeftOrRightValue.make(isLeftSide, 
record.value(;
+putInOuterJoinStore(record, inputRecordTimestamp);

Review Comment:
   This can just take `record` and call `record.timestamp()` itself.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -221,41 +199,28 @@ private void emitNonJoinedOuterRecords(
 // reset to MAX_VALUE in case the store is empty
 sharedTimeTracker.minTime = Long.MAX_VALUE;
 
-try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
+try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
-boolean outerJoinLeftWindowOpen = false;
-boolean outerJoinRightWindowOpen = false;
 while (it.hasNext()) {
-if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) {
-// if windows are open for both joinSides we can break 
since there are no more candidates to emit
+final KeyValue, 
LeftOrRightValue> nextKeyValue = it.next();
+final TimestampedKeyAndJoinSide 
timestampedKeyAndJoinSide = nextKeyValue.key;
+sharedTimeTracker.minTime = 
timestampedKeyAndJoinSide.getTimestamp();
+if 
(isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, true) && 
isOuterJoinWindowOpenForSide(timestampedKeyAndJoinSide, false)) {

Review Comment:
   Can this condition ever fire?
   If timestampedKeyAndJoinSide.isLeftSide() is true, then only the first 
condition can be true.
   If timestampedkeyAndJoinSide.isLeftSide() is false, then only the second 
condition can be true.
   
   These were state variables shared across multiple iterations and 
incorporated multiple timestampedKeyAndJoinSide before, now it's a function of 
just a single timestampedKeyAndJoinSide. Without knowing the context here, I 
would guess that would change the behavior.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -275,22 +240,34 @@ private void emitNonJoinedOuterRecords(
 }
 }
 
-@SuppressWarnings("unchecked")
-private VOut getNullJoinedValue(
-final K key, 
-final LeftOrRightValue leftOrRightValue) {
-// depending on the JoinSide fill in the joiner key and joiner 
values
-if (isLeftSide) {
-return joiner.apply(key,
-leftOrRightValue.getLeftValue(),
-leftOrRightValue.getRightValue());
-} else {
-return joiner.apply(key,
-(V1) leftOrRightValue.getRightValue(),
-(V2) leftOrRightValue.getLeftValue());
+private boolean isOuterJoinWindowOpenForSide(final 
TimestampedKeyAndJoinSide timestampedKeyAndJoinSide, final boolean 
isLeftSide) {
+if (isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
+// there are no more candidates to emit on left-outerJoin-side
+return timestampedKeyAndJoinSide.isLeftSide() == isLeftSide;
 }
+return false;
+}
+
+private void forwardNonJoinedOuterRecords(final Record record, 
final KeyValue, ? extends 
LeftOrRightValue> nextKeyValue) {
+final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = 
nextKeyValue.key;
+final K key = timestampedKeyAndJoinSide.getKey();
+final long timestamp = timestampedKeyAndJoinSide.getTimestamp();
+final LeftOrRightValue leftOrRightValue = 
nextKeyValue.value;
+final VThis thisValue = getThisValue(leftOrRightValue);
+final VOther otherValue = getOtherValue(leftOrRightValue);
+final VOut nullJoinedValue = joiner.apply(key, thisValue, 
otherValue);
+context().forward(
+
record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp)
+);
+}
+
+private boolean isOuterJoinWindowOpen(final 
TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) {
+final lo

Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1592918790


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   I decided that pulling this class apart into standalone classes is possible, 
but a little bit risky compatibility-wise, and more complexity than I wanted to 
take on in this PR. I've moved the static methods accepting Parser to be 
instance methods of an inner class, so that they can still call into Values to 
do conversions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-07 Thread via GitHub


gharris1727 commented on PR #15469:
URL: https://github.com/apache/kafka/pull/15469#issuecomment-2099075512

   Here's the Values test coverage changes:
   
   State | Class % | Method % | Line %
   -- | -- | -- | --
   Initial | 100% (4/4) | 81% (40/49) | 78% (464/589)
   Added tests | 100% (4/4) | 97% (48/49) | 85% (502/589)
   Refactored | 100% (6/6) | 93% (77/82) | 84% (565/669)
   
   There are more classes, methods, and lines, but the percentage coverage went 
up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-07 Thread via GitHub


junrao commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1592890872


##
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java:
##
@@ -283,7 +283,7 @@ private static Stream 
metadataVersionsForTestPartitionRegistration()
 return Stream.of(
 MetadataVersion.IBP_3_7_IV1,
 MetadataVersion.IBP_3_7_IV2,
-MetadataVersion.IBP_3_8_IV0
+MetadataVersion.IBP_3_8_IV1

Review Comment:
   Should we make the same change in PartitionChangeBuilderTest?



##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -717,7 +717,7 @@ public void testUnregisterBroker() throws Throwable {
 setBrokerId(0).
 setClusterId(active.clusterId()).
 
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV0)).
+setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_8_IV1)).

Review Comment:
   Should we change IBP_3_8_IV0 to IBP_3_8_IV1 in 
`testUncleanShutdownBroker()`? It is testing ELR, but we want to make sure ELR 
works under the latest version too, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2024-05-07 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15018.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.8.0
>
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-05-07 Thread via GitHub


C0urante merged PR #13801:
URL: https://github.com/apache/kafka/pull/13801


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16667) KRaftMigrationDriver gets stuck after successive failovers

2024-05-07 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16667:
-
Fix Version/s: 3.8.0
   3.7.1

> KRaftMigrationDriver gets stuck after successive failovers
> --
>
> Key: KAFKA-16667
> URL: https://issues.apache.org/jira/browse/KAFKA-16667
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, migration
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> This is a continuation of KAFKA-16171.
> It turns out that the active KRaftMigrationDriver can get a stale read from 
> ZK after becoming the active controller in ZK (i.e., writing to 
> "/controller").
> Because ZooKeeper only offers linearizability on writes to a given ZNode, it 
> is possible that we get a stale read on the "/migration" ZNode after writing 
> to "/controller" (and "/controller_epoch") when becoming active. 
>  
> The history looks like this:
>  # Node B becomes leader in the Raft layer. KRaftLeaderEvents are enqueued on 
> all KRaftMigrationDriver
>  # Node A writes some state to ZK, updates "/migration", and checks 
> "/controller_epoch" in one transaction. This happens before B claims 
> controller leadership in ZK. The "/migration" state is updated from X to Y
>  # Node B claims leadership by updating "/controller" and 
> "/controller_epoch". Leader B reads "/migration" state X
>  # Node A tries to write some state, fails on "/controller_epoch" check op.
>  # Node A processes new leader and becomes inactive
>  
> This does not violate consistency guarantees made by ZooKeeper.
>  
> > Write operations in ZooKeeper are {_}linearizable{_}. In other words, each 
> > {{write}} will appear to take effect atomically at some point between when 
> > the client issues the request and receives the corresponding response.
> and 
> > Read operations in ZooKeeper are _not linearizable_ since they can return 
> > potentially stale data. This is because a {{read}} in ZooKeeper is not a 
> > quorum operation and a server will respond immediately to a client that is 
> > performing a {{{}read{}}}.
>  
> --- 
>  
> The impact of this stale read is the same as KAFKA-16171. The 
> KRaftMigrationDriver never gets past SYNC_KRAFT_TO_ZK because it has a stale 
> zkVersion for the "/migration" ZNode. The result is brokers never learn about 
> the new controller and cannot update any partition state.
> The workaround for this bug is to re-elect the controller by shutting down 
> the active KRaft controller. 
> This bug was found during a migration where the KRaft controller was rapidly 
> failing over due to an excess of metadata. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove dev_version parameter from streams tests [kafka]

2024-05-07 Thread via GitHub


mjsax commented on PR #15874:
URL: https://github.com/apache/kafka/pull/15874#issuecomment-2099036837

   Triggered a system test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder-test/20/ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-05-07 Thread via GitHub


C0urante merged PR #15379:
URL: https://github.com/apache/kafka/pull/15379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1592877739


##
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/field/SingleFieldPathTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class SingleFieldPathTest {
+
+@Test void shouldFindField() {
+SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+assertEquals(barSchema.field("bar"), 
pathV2("foo.bar").fieldFrom(schema));
+assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
+}
+
+@Test void shouldReturnNullFieldWhenFieldNotFound() {
+SchemaBuilder barSchema = SchemaBuilder.struct().field("bar", 
Schema.INT32_SCHEMA);
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+
+assertNull(pathV2("un.known").fieldFrom(schema));
+assertNull(pathV2("foo.unknown").fieldFrom(schema));
+assertNull(pathV2("unknown").fieldFrom(schema));
+assertNull(pathV2("test").fieldFrom(null));
+}
+
+@Test void shouldFindValueInMap() {
+Map foo = new HashMap<>();
+foo.put("bar", 42);
+foo.put("baz", null);
+Map map = new HashMap<>();
+map.put("foo", foo);
+
+assertEquals(42, pathV2("foo.bar").valueFrom(map));
+assertNull(pathV2("foo.baz").valueFrom(map));
+}
+
+@Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
+Map foo = new HashMap<>();
+foo.put("bar", 42);
+foo.put("baz", null);
+Map map = new HashMap<>();
+map.put("foo", foo);
+
+assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.baz", 
FieldSyntaxVersion.V2).valueFrom(map));
+assertNull(new SingleFieldPath("foo.baz.inner", 
FieldSyntaxVersion.V2).valueFrom(map));
+}
+
+@Test void shouldFindValueInStruct() {
+SchemaBuilder bazSchema = SchemaBuilder.struct()
+.field("inner", Schema.STRING_SCHEMA);
+SchemaBuilder barSchema = SchemaBuilder.struct()
+.field("bar", Schema.INT32_SCHEMA)
+.field("baz", bazSchema.optional());
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+Struct foo = new Struct(barSchema)
+.put("bar", 42)
+.put("baz", null);
+Struct struct = new Struct(schema).put("foo", foo);
+
+assertEquals(42, pathV2("foo.bar").valueFrom(struct));
+assertNull(pathV2("foo.baz").valueFrom(struct));
+}
+
+@Test void shouldReturnNullValueWhenFieldNotFoundInStruct() {
+SchemaBuilder bazSchema = SchemaBuilder.struct()
+.field("inner", Schema.STRING_SCHEMA);
+SchemaBuilder barSchema = SchemaBuilder.struct()
+.field("bar", Schema.INT32_SCHEMA)
+.field("baz", bazSchema.optional());
+Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
+Struct foo = new Struct(barSchema)
+.put("bar", 42)
+.put("baz", null);
+Struct struct = new Struct(schema).put("foo", foo);
+
+assertNull(new SingleFieldPath("un.known", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(new SingleFieldPath("foo.unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(new SingleFieldPath("unknown", 
FieldSyntaxVersion.V2).valueFrom(struct));
+assertNull(ne

Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet if it's not the leader of 
the cluster.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, including changing `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on PR #6934:
URL: https://github.com/apache/kafka/pull/6934#issuecomment-2098991457

   Thanks @ivanyu, this is really close. One other thing--can you update the 
description with a brief overview of the PR (probably enough to just mention 
the new endpoint and its behavior), and remove the italicized template?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, and change `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592860284


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -773,6 +773,41 @@ public void 
testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws E
 connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, 
"Connector wasn't deleted in time");
 }
 
+@Test
+public void testPatchConnectorConfig() throws Exception {
+connect = connectBuilder.build();
+// start the clusters
+connect.start();
+
+connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+"Initial group of workers did not start in time.");
+
+connect.kafka().createTopic(TOPIC_NAME);
+
+Map props = defaultSinkConnectorProps(TOPIC_NAME);
+props.put("unaffected-key", "unaffected-value");
+props.put("to-be-deleted-key", "value");
+props.put(TASKS_MAX_CONFIG, "1");
+
+Map patch = new HashMap<>();
+patch.put(TASKS_MAX_CONFIG, "2");  // this plays as a value to be 
changed
+patch.put("to-be-added-key", "value");
+patch.put("to-be-deleted-key", null);
+
+connect.configureConnector(CONNECTOR_NAME, props);
+connect.patchConnectorConfig(CONNECTOR_NAME, patch);
+
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
 2,
+"connector and tasks did not start in time");
+
+Map expectedConfig = new HashMap<>(props);
+expectedConfig.put("name", CONNECTOR_NAME);
+expectedConfig.put("to-be-added-key", "value");
+expectedConfig.put(TASKS_MAX_CONFIG, "2");
+expectedConfig.remove("to-be-deleted-key");
+assertEquals(expectedConfig, 
connect.connectorInfo(CONNECTOR_NAME).config());

Review Comment:
   I think it's possible for poor timing (which Jenkins is notorious for...) to 
create flakiness here. The connector and both of its tasks may be started, but 
it's possible that the worker we hit with this request won't have read the 
patched connector config from the config topic yet if it's not the leader of 
the cluster.
   
   As a quick hack, we could tweak the order of operations and rely on existing 
retry logic in `ConnectAssertions::assertConnectorAndExactlyNumTasksAreRunning` 
to prevent this:
   
   1. Configure connector with `tasks.max = 2`
   2. Ensure connector is started and 2 tasks are running
   3. Patch connector, and change `tasks.max` to `3`
   4. Ensure connector is started and 3 tasks are running
   5. Perform the assertion on this line (i.e., that the connector config as 
reported by an arbitrary worker in the cluster matches the expected patch 
config)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-05-07 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1592859538


##
core/src/test/scala/unit/kafka/log/LogValidatorTest.scala:
##
@@ -404,9 +414,7 @@ class LogValidatorTest {
 assertEquals(now + 1, validatingResults.maxTimestampMs,
   s"Max timestamp should be ${now + 1}")
 
-val expectedOffsetOfMaxTimestamp = 1
-assertEquals(expectedOffsetOfMaxTimestamp, 
validatingResults.offsetOfMaxTimestampMs,
-  s"Offset of max timestamp should be 1")
+assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)

Review Comment:
   @chia7712 : There seems to be an existing bug. The method is 
`checkNonCompressed()`, but in line 370, we set the compression codec to GZIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592847527


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Yep, LGTM 👍
   
   Can you add that to the KIP?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,133 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(0, Collections.emptyList(), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfigNotALeader() {
+when(member.memberId()).thenReturn("not-leader");
+
+// The connector is pre-existing due to the mocks.
+ClusterConfigState originalSnapshot = new ClusterConfigState(
+1,
+null,
+Collections.singletonMap(CONN1, 0),
+Collections.singletonMap(CONN1, CONN1_CONFIG),
+Collections.singletonMap(CONN1, TargetState.STARTED),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(originalSnapshot);
+when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+// Patch the connector config.
+
+expectMemberEnsureActive();
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
false);
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException fencingException = 
assertThrows(ExecutionException.class, patchCallback::get);
+assertInstanceOf(ConnectException.class, fencingException.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo0", "unaffected");
+originalConnConfig.put("foo1", "will-be-changed");
+originalConnConfig.put("foo2", "will-be-removed");
+
+// The connector is pre-existing due to the mocks.
+
+ClusterConfigState originalSnapshot = new Cl

[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844408#comment-17844408
 ] 

FTR commented on KAFKA-16687:
-

Actually, this Kafka client is used for a web application. Just simply use 
jmeter to test it .

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1592838802


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void patchConnectorConfig(String connName, Map 
configPatch, Callback> callback) {
+log.trace("Submitting connector config patch request {}", connName);
+addRequest(() -> {

Review Comment:
   Yep, looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: set test global timeout as 10 mins [kafka]

2024-05-07 Thread via GitHub


gaurav-narula commented on PR #15065:
URL: https://github.com/apache/kafka/pull/15065#issuecomment-2098952056

   Bumping this thread for more visibility.
   
   Would be nice if we give this a shot since we still run into builds which 
[time 
out](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15836/12/pipeline/9)
 out of the blue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844400#comment-17844400
 ] 

Philip Nee commented on KAFKA-16687:


thanks for the information this is massively helpful. how do you stress test 
your application? Could you add a bit of information here?

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844397#comment-17844397
 ] 

FTR commented on KAFKA-16687:
-

For Kafka client, I have used KafkaConsumer  and AdminClient. But for this 
stress test case for testing native memory leak, I just request the 
KafkaConsumer  part. 

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844395#comment-17844395
 ] 

Philip Nee commented on KAFKA-16687:


Your application uses KafkaConsumer only right?

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844394#comment-17844394
 ] 

FTR edited comment on KAFKA-16687 at 5/7/24 5:12 PM:
-

I am using stress testing to my Java application which just compiled with 3.6.0.
Observing with NMT, [Internal] memory is still growing greatly ( scale GB) - 
native memory leak.


was (Author: JIRAUSER305339):
I am using stress testing to my Java application.
Observing with NMT, [Internal] memory is still growing greatly ( scale GB) - 
native memory leak.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844394#comment-17844394
 ] 

FTR commented on KAFKA-16687:
-

I am using stress testing to my Java application.
Observing with NMT, [Internal] memory is still growing greatly ( scale GB) - 
native memory leak.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9401: Reduce contention for Fetch requests [kafka]

2024-05-07 Thread via GitHub


soarez commented on PR #15836:
URL: https://github.com/apache/kafka/pull/15836#issuecomment-2098918354

   `:core:test` timed out only on JDK 8 and Scala 2.12. Restarted the build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844390#comment-17844390
 ] 

FTR commented on KAFKA-16687:
-

Surely, let me experiment with 3.6 and then feedback here.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844387#comment-17844387
 ] 

Philip Nee commented on KAFKA-16687:


But this is native memory leak so I suspect it's a different issue than heap 
leak. would you mind experimenting with 3.6 ? We didn't see the leak using 3.6.

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844382#comment-17844382
 ] 

FTR commented on KAFKA-16687:
-

BTW, before that I had also encountered heap leak issue in  Kafka client 2.8.2. 
After upgraded to 3.7.0(>3.4.0) and disabled JMX reporter, and heap leak issue 
was resolved. 

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-16687:
--

Assignee: Philip Nee

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844381#comment-17844381
 ] 

FTR commented on KAFKA-16687:
-

That's it, there are some topics with 30 partitions. I have tested Kafka client 
2.8.2 before, with same issue. 

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-07 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844379#comment-17844379
 ] 

Philip Nee commented on KAFKA-16687:


Hey [~fortherightous] - we also observed some (possible) memory leak.  The 
symptom is particularly obvious when working with large number of partitions.  
Have you tried kafka client version 3.6 or earlier? 

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >