[jira] [Created] (KAFKA-16559) Support to define the number of data folders in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16559:
--

 Summary: Support to define the number of data folders in 
ClusterTest
 Key: KAFKA-16559
 URL: https://issues.apache.org/jira/browse/KAFKA-16559
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


That allow us to run the reassignment tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16559) Support to define the number of data folders in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16559:
---
Issue Type: Test  (was: Improvement)

> Support to define the number of data folders in ClusterTest
> ---
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> That allow us to run the reassignment tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Various cleanups in server and server-common [kafka]

2024-04-16 Thread via GitHub


chia7712 merged PR #15710:
URL: https://github.com/apache/kafka/pull/15710


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-04-16 Thread via GitHub


chia7712 merged PR #15575:
URL: https://github.com/apache/kafka/pull/15575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on PR #15575:
URL: https://github.com/apache/kafka/pull/15575#issuecomment-2058418540

   the failed tests pass on my local.
   ```
   ./gradlew cleanTest :tools:test --tests 
DescribeConsumerGroupTest.testDescribeMembersOfExistingGroupWithNoMembers 
--tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful 
:storage:test --tests TransactionsWithTieredStoreTest.testFencingOnSend 
:metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers --tests 
QuorumControllerTest.testBootstrapZkMigrationRecord --tests 
QuorumControllerTest.testDelayedConfigurationOperations :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests MirrorConnectorsIntegrationSSLTest.testSyncTopicConfigs :core:test 
--tests PlaintextConsumerAssignTest.testAssignAndConsumeFromCommittedOffsets 
--tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
UserQuotaTest.testThrottledProducerConsumer --tests 
UserQuotaTest.testQuotaOverrideDelete --tests 
SaslSslConsumerTest.testCoordinatorFailover :clients:test --tests 
CooperativeStickyAssignorTest.testLargeAssignmentWithMult
 ipleConsumersLeavingAndRandomSubscription --tests 
SaslAuthenticatorFailureNoDelayTest.testInvalidPasswordSaslScram
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566868413


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   I have filed a jira https://issues.apache.org/jira/browse/KAFKA-16560



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16561) Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config

2024-04-16 Thread Yangkun Ai (Jira)
Yangkun Ai created KAFKA-16561:
--

 Summary: Disable `allow.auto.create.topics` in MirrorMaker2 
Consumer Config
 Key: KAFKA-16561
 URL: https://issues.apache.org/jira/browse/KAFKA-16561
 Project: Kafka
  Issue Type: Improvement
Reporter: Yangkun Ai


While using MirrorMaker 2.0 (MM2), I noticed that the consumer used by the 
connector does not disable the ALLOW_AUTO_CREATE_TOPICS_CONFIG option. This 
leads to the possibility of a topic being immediately recreated if I attempt to 
delete it from the source cluster while MirrorMaker 2.0 is running. I believe 
that automatic creation of new topics in this scenario is unreasonable, hence I 
think it is necessary to explicitly disable this option in the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16560) Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16560:
--

 Summary: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig
 Key: KAFKA-16560
 URL: https://issues.apache.org/jira/browse/KAFKA-16560
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


origin discussion: 
https://github.com/apache/kafka/pull/15715#discussion_r1564660916

It seems to me this jira should address following tasks.

1. make them immutable. We have adopted the builder pattern, so all changes 
should be completed in the builder phase

2. make all `Builder#build()` not accept any arguments. Instead, we should add 
new setters for those arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16560) Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig

2024-04-16 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837553#comment-17837553
 ] 

Kuan Po Tseng commented on KAFKA-16560:
---

Thanks [~chia7712], in https://github.com/apache/kafka/pull/15715, currently 
I'm working on making ClusterConfig immutable. Like you mentioned in pr 
comment, there are still plenty of places like BrokerNode, ControllerNode are 
not immutable. We can continue the work in this JIRA.

> Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig
> 
>
> Key: KAFKA-16560
> URL: https://issues.apache.org/jira/browse/KAFKA-16560
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> origin discussion: 
> https://github.com/apache/kafka/pull/15715#discussion_r1564660916
> It seems to me this jira should address following tasks.
> 1. make them immutable. We have adopted the builder pattern, so all changes 
> should be completed in the builder phase
> 2. make all `Builder#build()` not accept any arguments. Instead, we should 
> add new setters for those arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566877392


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   Thanks @chia7712! In this pr, currently I'm working on making ClusterConfig 
immutable. And other places as you mentioned like `BrokerNode`, 
`ControllerNode` are not immutable. We can continue the work in the JIRA you 
just filed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566884610


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   It seems this PR includes the changes to modify `ControllerNode` directly, 
and that is way I feel the refactor should be completed first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16560) Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16560:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig
> 
>
> Key: KAFKA-16560
> URL: https://issues.apache.org/jira/browse/KAFKA-16560
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> origin discussion: 
> https://github.com/apache/kafka/pull/15715#discussion_r1564660916
> It seems to me this jira should address following tasks.
> 1. make them immutable. We have adopted the builder pattern, so all changes 
> should be completed in the builder phase
> 2. make all `Builder#build()` not accept any arguments. Instead, we should 
> add new setters for those arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566877392


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   Thanks [Chia-Ping Tsai] ! In this pr, currently I'm working on making 
ClusterConfig immutable. And other places as you mentioned like `BrokerNode`, 
`ControllerNode` are not immutable. We can continue the work in the JIRA you 
just filed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566887858


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   OK, I'll also modify the `ControllerNode` part, thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566898370


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   Prefer doing the refactoring part in 
https://issues.apache.org/jira/browse/KAFKA-16560 since that requires bunch of 
works to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566900542


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   I'll address other comments you mentioned excepts the refactoring part 
tonight. Thanks for your patience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1566895408


##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   > OK, I'll also modify the ControllerNode part, thanks !
   
   Do you want to do refactor in this PR? or you can complete the refactor in 
https://issues.apache.org/jira/browse/KAFKA-16560 first. If you prefer to 
address them at once, I'm ok to close 
https://issues.apache.org/jira/browse/KAFKA-16560



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1566938172


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   I have created https://issues.apache.org/jira/browse/KAFKA-16559 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16559) Support to define the number of data folders in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16559:
---
Description: 
That allow us to run the reassignment tests.

Also, we should enhance setNumBrokerNodes 
(https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
 to accept extra argument to define the number of folders (by setLogDirectories)

  was:That allow us to run the reassignment tests.


> Support to define the number of data folders in ClusterTest
> ---
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16559) Support to define the number of data folders in ClusterTest

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16559:
---
Description: 
from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
That allow us to run the reassignment tests.

Also, we should enhance setNumBrokerNodes 
(https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
 to accept extra argument to define the number of folders (by setLogDirectories)

  was:
That allow us to run the reassignment tests.

Also, we should enhance setNumBrokerNodes 
(https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
 to accept extra argument to define the number of folders (by setLogDirectories)


> Support to define the number of data folders in ClusterTest
> ---
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


mimaison commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1566953505


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ 

[jira] [Created] (KAFKA-16562) Install the ginkgo to tools folder

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16562:
--

 Summary: Install the ginkgo to tools folder
 Key: KAFKA-16562
 URL: https://issues.apache.org/jira/browse/KAFKA-16562
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


https://github.com/apache/yunikorn-k8shim/blob/master/scripts/run-e2e-tests.sh#L111

`run-e2e-tests.sh` install the ginkgo in the `GOBIN` folder. However, our `make 
e2e_test` assumes all tools are installed at yunikorn-k8shim/tools, and hence 
we can see `Command 'ginkgo' not found` error if we don't export the `GOBIN`.

It seems to me this jira should includes following changes.

1. move the ginkgo installation from run-e2e-tests to makefile. Let `make 
tools` install all required tools
2. makefile should install ginkgo to tools folder. the tools folder is added to 
PATH, so user can run e2e more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16562) Install the ginkgo to tools folder

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16562.

Resolution: Invalid

sorry this jira is for yunikorn ...

> Install the ginkgo to tools folder
> --
>
> Key: KAFKA-16562
> URL: https://issues.apache.org/jira/browse/KAFKA-16562
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> https://github.com/apache/yunikorn-k8shim/blob/master/scripts/run-e2e-tests.sh#L111
> `run-e2e-tests.sh` install the ginkgo in the `GOBIN` folder. However, our 
> `make e2e_test` assumes all tools are installed at yunikorn-k8shim/tools, and 
> hence we can see `Command 'ginkgo' not found` error if we don't export the 
> `GOBIN`.
> It seems to me this jira should includes following changes.
> 1. move the ginkgo installation from run-e2e-tests to makefile. Let `make 
> tools` install all required tools
> 2. makefile should install ginkgo to tools folder. the tools folder is added 
> to PATH, so user can run e2e more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16561 Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-16 Thread via GitHub


aaron-ai opened a new pull request, #15728:
URL: https://github.com/apache/kafka/pull/15728

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR : Replaced the while loop with TestUtils.waitForCondition [kafka]

2024-04-16 Thread via GitHub


chia7712 merged PR #15678:
URL: https://github.com/apache/kafka/pull/15678


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2058593968

   @OmniaGM Could you please fix those conflicts? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15685: added compatibility for MinGW [kafka]

2024-04-16 Thread via GitHub


mimaison commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-2058597443

   To run the `GetOffsetShell` tool you should use the dedicated script 
`bin/kafka-get-offsets.sh`.
   
   The error you get is because the class was moved, it's now 
`org.apache.kafka.tools.GetOffsetShell`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1566918622


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+classicGroup.groupId());
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+classicGroup.groupId());
+return false;
+}
+return true;
+}
+
+/**
+ * Creates a ConsumerGroup corresponding to the given classic group.
+ *
+ * @param classicGroup  The ClassicGroup to convert.
+ * @param records   The list of Records.
+ * @return  The created ConsumerGroup.
+ */
+ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List records) {
+// The upgrade is always triggered by a new member joining the classic 
group, which always results in
+// updatedMember.subscribedTopicNames changing, the group epoch being 
bumped, and triggering a new rebalance.
+// If the ClassicGroup is rebalancing, inform the awaiting consumers 
of another ongoing rebalance
+// so that they will rejoin for the new rebalance.
+classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+classicGroup.createGroupTombstoneRecords(records);
+ConsumerGroup consumerGroup;
+try {
+consumerGroup = ConsumerGroup.fromClassicGroup(
+snapshotRegistry,
+metrics,
+classicGroup,
+metadataImage.topics(),
+log
+);
+} catch (SchemaException e) {
+log.warn("Cannot upgrade the classic group " + 
classicGroup.groupId() + ": fail to parse " +

Review Comment:
   nit: `log.warn("Cannot upgrade the classic group " + classicGroup.groupId() 
+ " to consumer group because the embedded consumer protocol is malformed: " + 
e.getMessage() + ".", e);`



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the Classic Group receives a 
ConsumerGroupHeartbeat request.
+ *
+ * @param classicGroup A ClassicGroup.
+ * @return the boolean indicating whether it's valid to online upgrade the 
classic group.
+ */
+private boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the online upgrade is disabled.",
+classicGroup.groupId());
+return false;
+} else if (!classicGroup.usesConsumerGroupProtocol()) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group does not use the consumer embedded protocol.",
+classicGroup.groupId());
+return false;
+} else if (classicGroup.size() > consumerGroupMaxSize) {
+log.info("Cannot upgrade classic group {} to consumer group 
because the group size exceeds the consumer group maximum size.",
+classicGroup.groupId());
+return false;
+}
+return true;
+}
+
+/**
+ * Creates a ConsumerGroup corresponding to the given classic group.
+ *
+ * @param classicGroup  The ClassicGroup to convert.
+ * @param records   The list of Records.
+ * @return  The created ConsumerGroup.

Review Comment:
   nit: There is an extra space after `return`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +778,72 @@ public ClassicGroup classicGroup(
 }
 }
 
+/**
+ * Validates the online upgrade if the 

[jira] [Commented] (KAFKA-16467) Add README to docs folder

2024-04-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837617#comment-17837617
 ] 

ASF GitHub Bot commented on KAFKA-16467:


FrankYang0529 commented on PR #596:
URL: https://github.com/apache/kafka-site/pull/596#issuecomment-2058627999

   Hi @showuon, thanks for reviewing. I've addressed all comments and add you 
as co-author.




> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


lucasbru commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1567025166


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestState.java:
##
@@ -66,6 +67,7 @@ public RequestState(final LogContext logContext,
  * and the backoff is restored to its minimal configuration.
  */
 public void reset() {
+this.requestInFlight = false;
 this.lastSentMs = -1;

Review Comment:
   what's the purpose of `lastSentMs` now? Can we remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1567042897


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ 

[jira] [Created] (KAFKA-16563) migration to KRaft hanging after KeeperException

2024-04-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16563:
-

 Summary: migration to KRaft hanging after KeeperException
 Key: KAFKA-16563
 URL: https://issues.apache.org/jira/browse/KAFKA-16563
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Luke Chen
Assignee: Luke Chen


When running ZK migrating to KRaft process, we encountered an issue that the 
migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
state. After investigation, the root cause is because the pollEvent didn't 
retry with the retriable KeeperException while it should.

 
{code:java}
2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered ZooKeeper 
error during event PollEvent. Will retry. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
[controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists for /migrationat 
org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)
at kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)   
 at 
kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
at 
kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


mimaison commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1567073072


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ https://github.com/apache/kafka-site)
   - extract `core/build/distribution/kafka*-site-docs.tgz` and copy its 
content to `kafka-site/3.7`
   - run `./start-preview.sh` in `kafka-site` and open localhost:8080



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-04-16 Thread via GitHub


mimaison merged PR #15558:
URL: https://github.com/apache/kafka/pull/15558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-16 Thread via GitHub


fvaleri commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2058739860

   @OmniaGM please rebase, hopefully for the last time :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR adds '-parameters' compiler option for :core tests compilation [kafka]

2024-04-16 Thread via GitHub


nizhikov opened a new pull request, #15729:
URL: https://github.com/apache/kafka/pull/15729

   All `core` module tests compiled as scala tests, but `-parameters` compiler 
option required for junit added only to javatest compiler option. This PR fixes 
it.
   
   Before fix:
   ```
   ❯ ./gradlew cleanTest :core:test --tests 
BootstrapControllersIntegrationTest.testDescribeCluster
   
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id e5a85607) using Gradle 
8.7, Java 21 and Scala 2.13.12
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:test
   
   Gradle Test Run :core:test > Gradle Test Executor 105 > 
BootstrapControllersIntegrationTest > testDescribeCluster(boolean) > 
"testDescribeCluster(boolean).false" PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 105 > 
BootstrapControllersIntegrationTest > testDescribeCluster(boolean) > 
"testDescribeCluster(boolean).true" PASSED
   ```
   
   After fix (Please, note difference in test names):
   ```
   ❯ ./gradlew cleanTest :core:test --tests 
BootstrapControllersIntegrationTest.testDescribeCluster
   
   > Configure project :
   Starting build with version 3.8.0-SNAPSHOT (commit id e5a85607) using Gradle 
8.7, Java 21 and Scala 2.13.12
   Build properties: maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileTestScala
   ...
   > Task :core:test
   
   Gradle Test Run :core:test > Gradle Test Executor 107 > 
BootstrapControllersIntegrationTest > testDescribeCluster(boolean) > 
"testDescribeCluster(boolean).usingBootstrapControllers=false" PASSED
   
   Gradle Test Run :core:test > Gradle Test Executor 107 > 
BootstrapControllersIntegrationTest > testDescribeCluster(boolean) > 
"testDescribeCluster(boolean).usingBootstrapControllers=true" PASSED
   
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests compilation [kafka]

2024-04-16 Thread via GitHub


nizhikov commented on PR #15729:
URL: https://github.com/apache/kafka/pull/15729#issuecomment-2058801469

   Hello @chia7712 
   
   I found small bug in #15667 
   Currently, we don't add '-parameters' compiler option therefore test names 
created wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


gaurav-narula opened a new pull request, #15730:
URL: https://github.com/apache/kafka/pull/15730

   Overloads `TestKitNodes.Builder::setNumBrokerNodes` to accept a 
`disksPerBroker` argument. We keep the existing semantics for the case when 
`disksPerBroker == 1`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


gaurav-narula commented on PR #15730:
URL: https://github.com/apache/kafka/pull/15730#issuecomment-2058820310

   CC: @chia7712 @soarez 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-16 Thread via GitHub


nizhikov commented on PR #15729:
URL: https://github.com/apache/kafka/pull/15729#issuecomment-2058836659

   @FrankYang0529 Can you, please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15309: Add custom error handler to Producer [kafka]

2024-04-16 Thread via GitHub


aliehsaeedii opened a new pull request, #15731:
URL: https://github.com/apache/kafka/pull/15731

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16111) Implement tests for tricky rebalance callback scenarios

2024-04-16 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16111:
--

Assignee: (was: Lucas Brutschy)

> Implement tests for tricky rebalance callback scenarios
> ---
>
> Key: KAFKA-16111
> URL: https://issues.apache.org/jira/browse/KAFKA-16111
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Major
>  Labels: callback, consumer-threading-refactor, integration-tests
> Fix For: 3.8.0
>
>
> There is justified concern that the new threading model may not play well 
> with "tricky" {{ConsumerRebalanceListener}} callbacks. We need to provide 
> some assurance that it will support complicated patterns.
>  # Design and implement test scenarios
>  # Update and document any design changes with the callback sub-system where 
> needed
>  # Provide fix(es) to the {{AsyncKafkaConsumer}} implementation to abide by 
> said design



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-16 Thread via GitHub


showuon commented on PR #15732:
URL: https://github.com/apache/kafka/pull/15732#issuecomment-2058930123

   @cmccabe @mumrah , call for review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567240139


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java:
##
@@ -0,0 +1,153 @@
+package org.apache.kafka.jmh.group_coordinator;

Review Comment:
   I see where you're coming from. I think that the builder could be considered 
as part of the assignor benchmarks. How about moving the client one to the 
existing `consumer` package and keeping the remaining ones in a `coordinator` 
package? We usually don't use package names with `_` so we should change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-16 Thread via GitHub


showuon opened a new pull request, #15732:
URL: https://github.com/apache/kafka/pull/15732

   When running ZK migrating to KRaft process, we encountered an issue that the 
migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
state. And it is because the pollEvent didn't retry with the retriable 
`MigrationClientException` (ZK client retriable errors) while it should. This 
PR fixes it and add test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16563) migration to KRaft hanging after KeeperException

2024-04-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16563:
--
Description: 
When running ZK migrating to KRaft process, we encountered an issue that the 
migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
state. After investigation, the root cause is because the pollEvent didn't 
retry with the retriable `MigrationClientException` (i.e. ZK client retriable 
errors) while it should. And because of this, the poll event will not poll 
anymore, which causes the KRaftMigrationDriver cannot work as expected.

 
{code:java}
2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered ZooKeeper 
error during event PollEvent. Will retry. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
[controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists for /migrationat 
org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)
at kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)   
 at 
kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
at 
kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840){code}

  was:
When running ZK migrating to KRaft process, we encountered an issue that the 
migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
state. After investigation, the root cause is because the pollEvent didn't 
retry with the retriable KeeperException while it should.

 
{code:java}
2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered ZooKeeper 
error during event PollEvent. Will retry. 
(org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
[controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
 KeeperErrorCode = NodeExists for /migrationat 
org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)
at kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)   
 at 
kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
at 
kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840){code}


> migration to KRaft hanging after KeeperException
> 
>
> Key: KAFKA-16563
> URL: https://issues.apache.org/jira/browse/KAFKA-16563
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When running ZK migrating to KRaft process, we encountered an issue that the 
> migrating is hanging and the `ZkMigrationState` cannot move to `

[jira] [Updated] (KAFKA-16563) migration to KRaft hanging after MigrationClientException

2024-04-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16563:
--
Summary: migration to KRaft hanging after MigrationClientException  (was: 
migration to KRaft hanging after KeeperException)

> migration to KRaft hanging after MigrationClientException
> -
>
> Key: KAFKA-16563
> URL: https://issues.apache.org/jira/browse/KAFKA-16563
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When running ZK migrating to KRaft process, we encountered an issue that the 
> migrating is hanging and the `ZkMigrationState` cannot move to `MIGRATION` 
> state. After investigation, the root cause is because the pollEvent didn't 
> retry with the retriable `MigrationClientException` (i.e. ZK client retriable 
> errors) while it should. And because of this, the poll event will not poll 
> anymore, which causes the KRaftMigrationDriver cannot work as expected.
>  
> {code:java}
> 2024-04-11 21:27:55,393 INFO [KRaftMigrationDriver id=5] Encountered 
> ZooKeeper error during event PollEvent. Will retry. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) 
> [controller-5-migration-driver-event-handler]org.apache.zookeeper.KeeperException$NodeExistsException:
>  KeeperErrorCode = NodeExists for /migrationat 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)at 
> kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:570)at 
> kafka.zk.KafkaZkClient.createInitialMigrationState(KafkaZkClient.scala:1701)  
>   at 
> kafka.zk.KafkaZkClient.getOrCreateMigrationState(KafkaZkClient.scala:1689)
> at 
> kafka.zk.ZkMigrationClient.$anonfun$getOrCreateMigrationRecoveryState$1(ZkMigrationClient.scala:109)
> at 
> kafka.zk.ZkMigrationClient.getOrCreateMigrationRecoveryState(ZkMigrationClient.scala:69)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.recoverMigrationStateFromZK(KRaftMigrationDriver.java:169)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$1900(KRaftMigrationDriver.java:62)
> at 
> org.apache.kafka.metadata.migration.KRaftMigrationDriver$PollEvent.run(KRaftMigrationDriver.java:794)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
> at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
> at java.base/java.lang.Thread.run(Thread.java:840){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567254740


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -835,6 +836,7 @@ object KafkaConfig {
   .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
   .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, 
AlterConfigPolicyClassNameDoc)
   .define(LogMessageDownConversionEnableProp, BOOLEAN, 
LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, 
LogMessageDownConversionEnableDoc)
+  .defineInternal(LogInitialTaskDelayMsProp, LONG, 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS, LOW)

Review Comment:
   Also, we can set `atLeast(0)` in the defineInternal method. So that we don't 
need additional validator below.



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -835,6 +836,7 @@ object KafkaConfig {
   .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
   .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, 
AlterConfigPolicyClassNameDoc)
   .define(LogMessageDownConversionEnableProp, BOOLEAN, 
LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, 
LogMessageDownConversionEnableDoc)
+  .defineInternal(LogInitialTaskDelayMsProp, LONG, 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS, LOW)

Review Comment:
   Could we add a doc at the last parameter for other developers know what this 
config is doing for? 
   Ex: 
   The initial task delay in millisecond when initializing tasks in LogManager. 
This should be used for testing only.



##
core/src/main/java/kafka/server/builders/LogManagerBuilder.java:
##
@@ -179,6 +179,7 @@ public LogManager build() {
   logDirFailureChannel,
   time,
   keepPartitionMetadataFile,
-  remoteStorageSystemEnable);
+  remoteStorageSystemEnable,
+  LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS);

Review Comment:
   I know currently we don't use LogManagerBuilder in the tests, but I still 
think we should add a `initialTaskDelayMs` setting and set default value to 
`LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS`.



##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogManagerTest {
 assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, 
"Check we have the expected number of segments.")
 
 // this cleanup shouldn't find any expired segments but should delete some 
to reduce size
-time.sleep(logManager.InitialTaskDelayMs)
+time.sleep(logManager.initialTaskDelayMs)
 assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 
segments")
 time.sleep(log.config.fileDeleteDelayMs + 1)

Review Comment:
   Could we create a test in LogManagerTest to verify the logManager will start 
these tasks after customized `initialTaskDelayMs`? Adding a simple test like 
what we see here, or maybe we can directly add verification inside these tests? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567241790


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+COOPERATIVE_STICKY(new CooperativeStickyAssignor());
+
+private final ConsumerPartitionAssignor assignor;
+
+AssignorType(ConsumerPartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public ConsumerPartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "COOPERATIVE_STICKY"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private Map subscriptions 
= new HashMap<>();
+
+private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
+
+private static final int numberOfRacks = 3;
+
+private static final int replicationFactor = 2;

Review Comment:
   As they are constants now, let's rename them to `NUMBER_OF_RACKS` and 
`REPLICATION_FACTOR`.



##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, softwar

Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


dajac commented on PR #15717:
URL: https://github.com/apache/kafka/pull/15717#issuecomment-2058988823

   @rreddy-22 Thanks for the update. Could you please create a jira for this 
task?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567289057


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -74,7 +74,7 @@ class KRaftClusterTest {
   def testCreateClusterAndClose(): Unit = {
 val cluster = new KafkaClusterTestKit.Builder(
   new TestKitNodes.Builder().
-setNumBrokerNodes(1).
+setNumBrokerNodes(1, 2).

Review Comment:
   Why we need this change?



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -79,9 +82,16 @@ public Builder setNumControllerNodes(int numControllerNodes) 
{
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
+return setNumBrokerNodes(numBrokerNodes, 1);
+}
+
+public Builder setNumBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {

Review Comment:
   As it accepts another argument now, maybe we should rename it to 
`setBrokerNodes`



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -90,9 +100,17 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-brokerNodeBuilders.put(nextId,
-new BrokerNode.Builder().
-setId(nextId));
+BrokerNode.Builder brokerNodeBuilder = new 
BrokerNode.Builder().setId(nextId);
+// Keeping consistent behaviour for existing tests when 
disksPerBroker == 1
+if (disksPerBroker > 1) {
+int brokerId = nextId; // this is because the lambda below 
requires a final
+List logDataDirectories = IntStream
+.range(0, disksPerBroker)
+.mapToObj(i -> String.format("broker_%d_data%d", 
brokerId, i))
+.collect(Collectors.toList());
+brokerNodeBuilder.setLogDirectories(logDataDirectories);

Review Comment:
   could you add a ut for this change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


gaurav-narula commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567295806


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -74,7 +74,7 @@ class KRaftClusterTest {
   def testCreateClusterAndClose(): Unit = {
 val cluster = new KafkaClusterTestKit.Builder(
   new TestKitNodes.Builder().
-setNumBrokerNodes(1).
+setNumBrokerNodes(1, 2).

Review Comment:
   I was trying to be cheeky to avoid another unit test :) Will add a separate 
one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15729:
URL: https://github.com/apache/kafka/pull/15729#discussion_r1567300055


##
build.gradle:
##
@@ -1239,6 +1239,9 @@ project(':core') {
   }
 }
   }
+  // -parameters generates arguments with parameter names in 
TestInfo#getDisplayName.
+  // ref: 
https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
+  compileTestScala.options.compilerArgs.add "-parameters"

Review Comment:
   Instead of adding the args to core module directly, could we define the 
compile args in `ScalaCompile`? we can add condition check to make sure the 
`-parameters` is added to test only. For example:
   
   ```
   if (name == "compileTestScala")
 options.compilerArgs << "-parameters"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16564) Apply `Xlint` to java code in core module

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16564:
--

Assignee: Chia-Ping Tsai

> Apply `Xlint` to java code in core module
> -
>
> Key: KAFKA-16564
> URL: https://issues.apache.org/jira/browse/KAFKA-16564
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16556: SubscriptionState should not prematurely reset 'pending' partitions [kafka]

2024-04-16 Thread via GitHub


lucasbru merged PR #15724:
URL: https://github.com/apache/kafka/pull/15724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16564) Apply `Xlint` to java code in core module

2024-04-16 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16564:
--

 Summary: Apply `Xlint` to java code in core module
 Key: KAFKA-16564
 URL: https://issues.apache.org/jira/browse/KAFKA-16564
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567303386


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -90,9 +100,17 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-brokerNodeBuilders.put(nextId,
-new BrokerNode.Builder().
-setId(nextId));
+BrokerNode.Builder brokerNodeBuilder = new 
BrokerNode.Builder().setId(nextId);
+// Keeping consistent behaviour for existing tests when 
disksPerBroker == 1
+if (disksPerBroker > 1) {
+int brokerId = nextId; // this is because the lambda below 
requires a final
+List logDataDirectories = IntStream
+.range(0, disksPerBroker)
+.mapToObj(i -> String.format("broker_%d_data%d", 
brokerId, i))
+.collect(Collectors.toList());
+brokerNodeBuilder.setLogDirectories(logDataDirectories);
+}

Review Comment:
   Should this be part of `BrokerNode.Builder`, perhaps a new 
`.setNumLogDirectories()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16564) Apply `Xlint` to java code in core module

2024-04-16 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16564:
---
Description: 
the java code in core module is in scala block, so the jvm compiler arguments 
is defined by `ScalaCompile` type. The `xlint`-related arguments defined in 
`JavaCompile` don't work with java code in core module.

Personally, we should apply the same `xlint` to java code in core module before 
we complete the code migration.

> Apply `Xlint` to java code in core module
> -
>
> Key: KAFKA-16564
> URL: https://issues.apache.org/jira/browse/KAFKA-16564
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> the java code in core module is in scala block, so the jvm compiler arguments 
> is defined by `ScalaCompile` type. The `xlint`-related arguments defined in 
> `JavaCompile` don't work with java code in core module.
> Personally, we should apply the same `xlint` to java code in core module 
> before we complete the code migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16564) Apply `Xlint` to java code in core module

2024-04-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837699#comment-17837699
 ] 

Chia-Ping Tsai commented on KAFKA-16564:


[~ijuma] Could you please take a look at this jira as you are expert of gradle

> Apply `Xlint` to java code in core module
> -
>
> Key: KAFKA-16564
> URL: https://issues.apache.org/jira/browse/KAFKA-16564
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> the java code in core module is in scala block, so the jvm compiler arguments 
> is defined by `ScalaCompile` type. The `xlint`-related arguments defined in 
> `JavaCompile` don't work with java code in core module.
> Personally, we should apply the same `xlint` to java code in core module 
> before we complete the code migration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-16 Thread via GitHub


nizhikov commented on code in PR #15729:
URL: https://github.com/apache/kafka/pull/15729#discussion_r1567309609


##
build.gradle:
##
@@ -1239,6 +1239,9 @@ project(':core') {
   }
 }
   }
+  // -parameters generates arguments with parameter names in 
TestInfo#getDisplayName.
+  // ref: 
https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
+  compileTestScala.options.compilerArgs.add "-parameters"

Review Comment:
   Rework as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559: allow defining number of disks per broker in ClusterTest [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15730:
URL: https://github.com/apache/kafka/pull/15730#discussion_r1567313085


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -90,9 +100,17 @@ public Builder setNumBrokerNodes(int numBrokerNodes) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-brokerNodeBuilders.put(nextId,
-new BrokerNode.Builder().
-setId(nextId));
+BrokerNode.Builder brokerNodeBuilder = new 
BrokerNode.Builder().setId(nextId);
+// Keeping consistent behaviour for existing tests when 
disksPerBroker == 1
+if (disksPerBroker > 1) {
+int brokerId = nextId; // this is because the lambda below 
requires a final
+List logDataDirectories = IntStream
+.range(0, disksPerBroker)
+.mapToObj(i -> String.format("broker_%d_data%d", 
brokerId, i))
+.collect(Collectors.toList());
+brokerNodeBuilder.setLogDirectories(logDataDirectories);
+}

Review Comment:
   noted that we should consider the `combined` mode. see 
https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/BrokerNode.java#L80



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15729:
URL: https://github.com/apache/kafka/pull/15729#discussion_r1567314494


##
build.gradle:
##
@@ -1239,6 +1239,9 @@ project(':core') {
   }
 }
   }
+  // -parameters generates arguments with parameter names in 
TestInfo#getDisplayName.
+  // ref: 
https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
+  compileTestScala.options.compilerArgs.add "-parameters"

Review Comment:
   @nizhikov could you updates the changes of #15663 for consistency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for Server Side Rebalances: KIP_848 [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1567316600


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec(topicMetadata);
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAw

Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2059036032

   > hopefully for the last time :)
   
   I feel #15684 will cause many conflicts to this PR, and vice versa :_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-16 Thread via GitHub


mimaison commented on PR #15705:
URL: https://github.com/apache/kafka/pull/15705#issuecomment-2059036435

   None of the test failures seem related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-16 Thread via GitHub


mimaison merged PR #15705:
URL: https://github.com/apache/kafka/pull/15705


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-16 Thread via GitHub


mimaison commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2059050678

   Let's just let Omnia decides which PR she prefers doing first. Then we can 
all review that PR, and once merged move onto the next one. This would avoid 
unnecessary churn as I expect she's getting tired of rebasing all of these PRs 
every day!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


mwesterby opened a new pull request, #15733:
URL: https://github.com/apache/kafka/pull/15733

   Prevents the storage tool from crashing when some of the directories are 
unavailable, but others are still available and have not yet been formatted.
   
   Should all directories be unavailable however, then the tool will fail.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-16 Thread via GitHub


nizhikov commented on code in PR #15729:
URL: https://github.com/apache/kafka/pull/15729#discussion_r1567359782


##
build.gradle:
##
@@ -1239,6 +1239,9 @@ project(':core') {
   }
 }
   }
+  // -parameters generates arguments with parameter names in 
TestInfo#getDisplayName.
+  // ref: 
https://github.com/junit-team/junit5/blob/4c0dddad1b96d4a20e92a2cd583954643ac56ac0/junit-jupiter-params/src/main/java/org/junit/jupiter/params/ParameterizedTest.java#L161-L164
+  compileTestScala.options.compilerArgs.add "-parameters"

Review Comment:
   I create a function `addParametersForTests` to add `-parameters` arg if 
required and use it both for scala and java tasks. Please, take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


lianetm commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1567368858


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,51 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+/**
+ * In some cases, the network layer is very fast and can send out 
a second request within the same
+ * millisecond timestamp as receiving the first response.
+ *
+ * 
+ *
+ * The previous logic for tracking inflight status used timestamps: if the 
timestamp from the last received
+ * response was less than the timestamp from the last sent 
request, we'd interpret that as having an
+ * inflight request. However, this approach would incorrectly return 
false from
+ * {@link RequestState#requestInFlight()} if the two timestamps were 
equal.
+ */

Review Comment:
   nit: Do we think it's helpful to explain so much about a "previous logic" 
that was wrong here? I find it can be confusing for others. Also here we're 
actually just testing the approach of checking inflights based on a flag (the 
why we chose that approach makes more sense to me in the `requestInflight()` 
func itself) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-16 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1567372730


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+// The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+// in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+// If the consumer's real assigned partitions haven't been updated 
according to
+// classicGroupMember.assignment(), it will retry the request.
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment()));

Review Comment:
   nit: Should we move the closing parenthesis to new line to be consistent?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1088,133 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+// The new ConsumerGroupMember's assignedPartitions and 
targetAssignmentSet need to be the same
+// in order to keep it stable. Thus, both of them are set to be 
classicGroupMember.assignment().
+// If the consumer's real assigned partitions haven't been updated 
according to
+// classicGroupMember.assignment(), it will retry the request.
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get(;

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16262:


Assignee: (was: Walter Hernandez)

> Add IQv2 to Kafka Streams documentation
> ---
>
> Key: KAFKA-16262
> URL: https://issues.apache.org/jira/browse/KAFKA-16262
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, newbie
>
> The new IQv2 API was added many release ago. While it is still not feature 
> complete, we should add it to the docs 
> ([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
>  to make users aware of the new API so they can start to try it out, report 
> issue and provide feedback / feature requests.
> We might still state that IQv2 is not yet feature complete, but should change 
> the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16262:


Assignee: Suprem Vanam

> Add IQv2 to Kafka Streams documentation
> ---
>
> Key: KAFKA-16262
> URL: https://issues.apache.org/jira/browse/KAFKA-16262
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Assignee: Suprem Vanam
>Priority: Minor
>  Labels: beginner, newbie
>
> The new IQv2 API was added many release ago. While it is still not feature 
> complete, we should add it to the docs 
> ([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
>  to make users aware of the new API so they can start to try it out, report 
> issue and provide feedback / feature requests.
> We might still state that IQv2 is not yet feature complete, but should change 
> the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16561) Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16561:


Assignee: Yangkun Ai

> Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config
> --
>
> Key: KAFKA-16561
> URL: https://issues.apache.org/jira/browse/KAFKA-16561
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Yangkun Ai
>Assignee: Yangkun Ai
>Priority: Major
>
> While using MirrorMaker 2.0 (MM2), I noticed that the consumer used by the 
> connector does not disable the ALLOW_AUTO_CREATE_TOPICS_CONFIG option. This 
> leads to the possibility of a topic being immediately recreated if I attempt 
> to delete it from the source cluster while MirrorMaker 2.0 is running. I 
> believe that automatic creation of new topics in this scenario is 
> unreasonable, hence I think it is necessary to explicitly disable this option 
> in the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


lianetm commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2059130202

   Thanks for the changes @kirktrue , LGTM. 
   
   Just thinking out loud to share a concern and reasoning about it. The flag 
based approach always makes me think if we would break managers out there that 
could be using the `canSend` (returns false based on the flag), but not using 
the other funcs that flip the flag when a response is 
received/not-received/succeeds/fails. My reasoning then was that given than the 
previous approach was based in numeric variables that were being set on the 
same places where the flag is flipped now I would say we can expect that no 
managers will be affected, so we should be good with it I expect. Thanks for 
the changes!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16554) Online downgrade triggering and group type conversion

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16554:


Assignee: Dongnuo Lyu

> Online downgrade triggering and group type conversion
> -
>
> Key: KAFKA-16554
> URL: https://issues.apache.org/jira/browse/KAFKA-16554
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15615) Improve handling of fetching during metadata updates

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-15615:


Assignee: appchemist

> Improve handling of fetching during metadata updates
> 
>
> Key: KAFKA-15615
> URL: https://issues.apache.org/jira/browse/KAFKA-15615
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: appchemist
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher
> Fix For: 3.8.0
>
>
> [During a review of the new 
> fetcher|https://github.com/apache/kafka/pull/14406#discussion_r193941], 
> [~junrao] found what appears to be an opportunity for optimization.
> When a fetch response receives an error about partition leadership, fencing, 
> etc. a metadata refresh is triggered. However, it takes time for that refresh 
> to occur, and in the interim, it appears that the consumer will blindly 
> attempt to fetch data for the partition again, in kind of a "definition of 
> insanity" type of way. Ideally, the consumer would have a way to temporarily 
> ignore those partitions, in a way somewhat like the "pausing" approach so 
> that they are skipped until the metadata refresh response is fully processed.
> This affects both the existing KafkaConsumer and the new 
> PrototypeAsyncConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1567397892


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/assets/1357510/f5ac55bc-2262-40fd-8913-9a83deb4e543";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567406313


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 
seconds.
+// The first retention task is executed after 30 seconds, so waiting for 
35 seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 35000)

Review Comment:
   Yeah, it takes long time for first retention. I will update the case after 
https://github.com/apache/kafka/pull/15719 is merged. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


mimaison commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1567410332


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ 

Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567414173


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   No, I use `"^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"` as regular expression, 
so file name can end with `-delete` or `-future`. The `.delete()` function here 
will return true if the file is successfully deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1567414802


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1223,6 +1223,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 s"but we only have log segments starting from offset: 
$logStartOffset.")
   }
 
+  private def checkLocalLogStartOffset(offset: Long): Unit = {

Review Comment:
   It seems reading records between [logStartOffset, localLogStartOffset] is 
dangerous since the segment won't be in local-disk. That is a bit chaos to me 
as `UnifiedLog` presents a unified view of local and tiered log segment 
(https://github.com/apache/kafka/blob/fccd7fec666d6570758e0b7891771099240ceee8/core/src/main/scala/kafka/log/UnifiedLog.scala#L59).
 The check looks like a limit that we can't "view" data from tiered log segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1567417492


##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ 

[jira] [Assigned] (KAFKA-16417) When initializeResources throws an exception in TopicBasedRemoteLogMetadataManager.scala, initializationFailed needs to be set to true

2024-04-16 Thread Walter Hernandez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walter Hernandez reassigned KAFKA-16417:


Assignee: zhaobo

> When initializeResources throws an exception in 
> TopicBasedRemoteLogMetadataManager.scala, initializationFailed needs to be 
> set to true
> --
>
> Key: KAFKA-16417
> URL: https://issues.apache.org/jira/browse/KAFKA-16417
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.2.0
>Reporter: zhaobo
>Assignee: zhaobo
>Priority: Major
>
> If the initializing producer/consumer fails, the broker cannot actually read 
> and write the remote storage system normally. We need to mark 
> initializationFailed= true to sense this event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567422789


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
-   remoteStorageSystemEnable = remoteStorageSystemEnable)
+   remoteStorageSystemEnable = remoteStorageSystemEnable,
+   initialTaskDelayMs = 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS)

Review Comment:
   If the goal is to speed up tests, shouldn't we use a lower value here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567447761


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogManagerTest {
 assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, 
"Check we have the expected number of segments.")
 
 // this cleanup shouldn't find any expired segments but should delete some 
to reduce size
-time.sleep(logManager.InitialTaskDelayMs)
+time.sleep(logManager.initialTaskDelayMs)
 assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 
segments")
 time.sleep(log.config.fileDeleteDelayMs + 1)

Review Comment:
   > Could we create a test in LogManagerTest to verify the logManager will 
start these tasks after customized initialTaskDelayMs
   
   Huge thanks ! I'm a little confused about the comment above, the test in 
LogManagerTest itself verify that tasks like log cleanup, flush logs are 
triggered after sleeping initialTaskDelayMs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


mimaison merged PR #15718:
URL: https://github.com/apache/kafka/pull/15718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-16 Thread via GitHub


emitskevich-blp commented on PR #15722:
URL: https://github.com/apache/kafka/pull/15722#issuecomment-2059223960

   Tests failure is unrelated to PR changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15733:
URL: https://github.com/apache/kafka/pull/15733#discussion_r1567431572


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -440,8 +440,12 @@ object StorageTool extends Logging {
 "Use --ignore-formatted to ignore this directory and format the 
others.")
 }
 if (!copier.errorLogDirs().isEmpty) {
-  val firstLogDir = copier.errorLogDirs().iterator().next()
-  throw new TerseFailure(s"I/O error trying to read log directory 
$firstLogDir.")
+  copier.errorLogDirs().forEach(errorLogDir => {
+stream.println(s"I/O error trying to read log directory $errorLogDir. 
Ignoring...")
+  })
+  if(metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
+throw new TerseFailure("No available log directories to format.")

Review Comment:
   Do we need to check `logDirProps` before throwing this exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-16 Thread via GitHub


emitskevich-blp commented on PR #15722:
URL: https://github.com/apache/kafka/pull/15722#issuecomment-2059230580

   This is potentially related to [KIP-773 
changes](https://github.com/apache/kafka/pull/11302)
   @jlprat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567464576


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
-   remoteStorageSystemEnable = remoteStorageSystemEnable)
+   remoteStorageSystemEnable = remoteStorageSystemEnable,
+   initialTaskDelayMs = 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS)

Review Comment:
   Thanks for your comment @soarez :smiley: ! If I understand correctly 
TestUtils#createLogManager use MockTime and the clock would advance immediately 
after invoking the time#sleep method in the test, pointing to the corresponding 
sleep time thereafter. The goal in this pr is to introduce a new internal 
config for tests to speed up for tests like e2e/integration tests which can't 
use MockTime as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567464576


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
-   remoteStorageSystemEnable = remoteStorageSystemEnable)
+   remoteStorageSystemEnable = remoteStorageSystemEnable,
+   initialTaskDelayMs = 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS)

Review Comment:
   Thanks for your comment @soarez :smiley: ! If I understand correctly 
TestUtils#createLogManager use MockTime and the clock would advance immediately 
after invoking the time#sleep method in the test, pointing to the corresponding 
sleep time thereafter. The goal in this pr is to introduce a new internal 
config to speed up for tests like e2e/integration tests which can't use 
MockTime as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1567464913


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the handleException will be 
overridden if needed
+private Consumer retryHandler = NO_OP_HANDLER;
+
+public void retryHandler(Consumer retryHandler) {
+this.retryHandler = retryHandler;
+}

Review Comment:
   Did you consider simply defining an empty `public void 
retryHandler(Throwable thrown)` that `PollEvent` can override?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in connect [kafka]

2024-04-16 Thread via GitHub


mimaison opened a new pull request, #15734:
URL: https://github.com/apache/kafka/pull/15734

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-16 Thread via GitHub


lucasbru commented on code in PR #15723:
URL: https://github.com/apache/kafka/pull/15723#discussion_r1567484668


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestStateTest.java:
##
@@ -48,4 +50,51 @@ public void testRequestStateSimple() {
 state.reset();
 assertTrue(state.canSendRequest(200));
 }
+
+@Test
+public void testTrackInflightOnSuccessfulAttempt() {
+testTrackInflight(RequestState::onSuccessfulAttempt);
+}
+
+@Test
+public void testTrackInflightOnFailedAttempt() {
+testTrackInflight(RequestState::onFailedAttempt);
+}
+
+/**
+ * In some cases, the network layer is very fast and can send out 
a second request within the same
+ * millisecond timestamp as receiving the first response.
+ *
+ * 
+ *
+ * The previous logic for tracking inflight status used timestamps: if the 
timestamp from the last received
+ * response was less than the timestamp from the last sent 
request, we'd interpret that as having an
+ * inflight request. However, this approach would incorrectly return 
false from
+ * {@link RequestState#requestInFlight()} if the two timestamps were 
equal.
+ */

Review Comment:
   yeah I also noticed this. I think this parapraph fits best in the commit 
message / PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


mimaison commented on PR #15718:
URL: https://github.com/apache/kafka/pull/15718#issuecomment-2059263155

   Backported to 3.7 too: 
https://github.com/apache/kafka/commit/5d132256d3d8650f16ea3fa83b12b2676b30eed7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-16 Thread via GitHub


soarez commented on PR #15733:
URL: https://github.com/apache/kafka/pull/15733#issuecomment-2059274214

   @chia7712 Thanks for reviewing this.
   
   > It seems LogManager can't accept failed directories in startup, right?
   
   LogManager can accept failed directories in startup. That method is a bit 
confusing, but if you look a bit below the line you pointed to, the exception 
is caught, the fact the directory is failed is processed, and the directory is 
not added to `liveLogDirs`.
   
   
https://github.com/apache/kafka/blob/fccd7fec666d6570758e0b7891771099240ceee8/core/src/main/scala/kafka/log/LogManager.scala#L201-L204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-16 Thread via GitHub


soarez commented on PR #15718:
URL: https://github.com/apache/kafka/pull/15718#issuecomment-2059275719

   Thank you @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


soarez commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567497368


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
-   remoteStorageSystemEnable = remoteStorageSystemEnable)
+   remoteStorageSystemEnable = remoteStorageSystemEnable,
+   initialTaskDelayMs = 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS)

Review Comment:
   I see. Thanks for clarifying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-16 Thread via GitHub


brandboat commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1567499221


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1524,7 +1524,8 @@ object TestUtils extends Logging {
logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
keepPartitionMetadataFile = true,
interBrokerProtocolVersion = interBrokerProtocolVersion,
-   remoteStorageSystemEnable = remoteStorageSystemEnable)
+   remoteStorageSystemEnable = remoteStorageSystemEnable,
+   initialTaskDelayMs = 
LogConfig.DEFAULT_INITIAL_TASK_DELAY_MS)

Review Comment:
   No problem, thanks for reviewing the pr !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >