[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833816#comment-17833816
 ] 

Arpit Goyal commented on KAFKA-12506:
-

[~kebab-mai-haddi] You can reassign this ticket back , if you want to work on 
this. 
[~ableegoldman] Can you help me assign some newbie tickets which would help me 
to start on kafka streams.

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Test minor gh review action - this is a test PR, do not review [kafka]

2024-04-03 Thread via GitHub


github-actions[bot] commented on PR #15123:
URL: https://github.com/apache/kafka/pull/15123#issuecomment-2036109173

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]

2024-04-03 Thread via GitHub


github-actions[bot] commented on PR #15126:
URL: https://github.com/apache/kafka/pull/15126#issuecomment-2036109157

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16211) Inconsistent config values in CreateTopicsResult and DescribeConfigsResult

2024-04-03 Thread Dung Ha (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833809#comment-17833809
 ] 

Dung Ha commented on KAFKA-16211:
-

My guess from reading the code is that describeTopics() will send the request 
to a specified broker if the config resource is broker 
(ConfigResouce.Type.BROKER). In the case that the config resource is topic 
(ConfigResouce.Type.TOPIC), then a broker will be assigned using 
LeastLoadBrokerOrActiveKController() (in KafkaAdminClient), which in this 
situation will assign the "least loaded" broker. I have tested this and, 
indeed, each time I use describeConfigs() with the ConfigResource's type being 
"TOPIC", a different broker's static configuration may be returned. My question 
is: Is it supposed to be the way describeConfigs() should be used with the 
configResource's type being ConfigResouce.Type.TOPIC? Or even, are we supposed 
to use describeConfig() with configResource's type being 
ConfigResouce.Type.TOPIC instead of strictly with ConfigResouce.Type.BROKER?

> Inconsistent config values in CreateTopicsResult and DescribeConfigsResult
> --
>
> Key: KAFKA-16211
> URL: https://issues.apache.org/jira/browse/KAFKA-16211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Gantigmaa Selenge
>Assignee: Dung Ha
>Priority: Minor
>
> When creating a topic in KRaft cluster, a config value returned in 
> CreateTopicsResult is different than what you get from describe topic 
> configs, if the config was set in broker.properties or controller.properties 
> or in both but with different values. 
>  
> For example, start a broker with `segment.bytes` set to 573741824 in the 
> properties file and then create a topic, the CreateTopicsResult contains:
> ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, 
> isSensitive=false, isReadOnly=false, synonyms=[], type=INT, 
> documentation=null)
>  because the controller was started without setting this config. 
> However when you describe configurations for the same topic, the config value 
> set by the broker is returned:
> Create topic configsConfigEntry(name=segment.bytes, value=573741824, 
> source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, 
> synonyms=[], type=null, documentation=null)
>  
> Vice versa, if the controller is started with this config set to a different 
> value, the create topic request returns the value set by the controller and 
> then when you describe the config for the same topic, you get the value set 
> by the broker. This makes it confusing to understand which value being is 
> used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]

2024-04-03 Thread via GitHub


showuon commented on PR #15409:
URL: https://github.com/apache/kafka/pull/15409#issuecomment-2036106373

   > > @gaurav-narula , the new added KRaft tests are failing in our test env. 
The logs are 
[here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing).
 Could you help check and fix them? Thanks.
   > 
   > I believe this pr depends on #15335
   
   OK, let's wait until #15335 merged and then check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread Aviral Srivastava (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833804#comment-17833804
 ] 

Aviral Srivastava commented on KAFKA-12506:
---

[~ableegoldman] , I want to work on this. I am sorry this was abandoned. I am 
back. Thank you!

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833758#comment-17833758
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12506:


[~goyarpit] yes, it seems this ticket has been abandoned. Feel free to pick it 
up (and let me know if you have any questions)

 

[~kebab-mai-haddi] if you still want to work on this, just let me know, I'm 
sure there are multiple improvements that could be made here in parallel

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12506:
--

Assignee: Arpit Goyal  (was: Aviral Srivastava)

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500
> See the umbrella ticket for the list of improvements we need to make this a 
> more effective integration test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16458) Add contains method in KeyValue store interface

2024-04-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833749#comment-17833749
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16458:


This could be nice to have, but unfortunately given the overly complex way that 
state stores are implemented and the multi-layer hierarchy, something that 
should be simple – ie just adding a basic API to the StateStore interface – 
actually ends up being a massive amount of work.

We do plan to overhaul the state store hierarchy at some point in order to 
simplify the codebase, both for maintenance and new features, although there's 
no clear roadmap or promise for this to happen anytime soon. That said, I would 
personally suggest we hold off on adding any new APIs that don't add 
strictly-new functionality until after we've simplified the state store 
implementation. 

Of course, if this is something you really want, you're always free to kick off 
a KIP discussion whenever. Just wanted to provide some context and warn that 
this would not be as straightforward as it might seem to actually implement.

To your final question: I do think in some sense the reality is that yes, this 
API is not offered on purpose, in order to keep the interface as simple as 
possible. But this in itself would be less of a concern if the state store 
hierarchy was not such a hassle to expand and maintain, which is why I think 
the community would be open to it after we can get around to cleaning up the 
store implementation.

> Add contains method in KeyValue store interface
> ---
>
> Key: KAFKA-16458
> URL: https://issues.apache.org/jira/browse/KAFKA-16458
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Ayoub Omari
>Priority: Minor
>  Labels: needs-kip
>
> In some stream processors, we sometimes just want to check if a key exists in 
> the state store or not.
>  
> I find calling .get() and checking if the return value is null a little bit 
> verbose
> {code:java}
> if (store.get(key) != null) {
> }{code}
>  
> But I am not sure if it is on purpose that we would like to keep the store 
> interface simple.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-03 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833744#comment-17833744
 ] 

Philip Nee commented on KAFKA-16389:


[^consumer.log]

It seems like the consumer receives empty topicPartitions after the assignment. 
 One suspicious thing that I see is that there's no send in between the 
successive Receives.  I wonder if this is a race condition:

First: Assignments received

 
{code:java}
7997 [2024-04-03 19:48:49,445] DEBUG [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Received 
CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header 
RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, 
clientId=consumer-test_group_id-1, correlationId=35, headerVersion=2): 
ConsumerGroupHeartbeatResponseData(t     hrottleTimeMs=0, errorCode=0, 
errorMessage=null, memberId='pBC-jWhKQ7yr0y9MXysT2g', memberEpoch=1, 
heartbeatIntervalMs=5000, 
assignment=Assignment(topicPartitions=[TopicPartitions(topicId=TcUeldqLQae7xsWQo2WjPA,
 partitions=[0, 1, 2, 3, 4, 5])])) (org.apache.kafka.clients.NetworkClient)

7999 [2024-04-03 19:48:49,450] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Member 
pBC-jWhKQ7yr0y9MXysT2g with epoch 1 transitioned from JOINING to RECONCILING. 
(org.apache.kafka.clients.consumer.internals.MembershipManagerImpl){code}
 

 

Assignments completed:

 
{code:java}
8005 [2024-04-03 19:48:49,454] INFO [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Updating assignment 
with local epoch 0
8006         Assigned partitions:                       [test_topic-0, 
test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5]
8007         Current owned partitions:                  []
8008         Added partitions (assigned - owned):       [test_topic-0, 
test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5]
8009         Revoked partitions (owned - assigned):     []
8010  (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code}
 

 

Then receive another heartbeat:
{code:java}
8021 [2024-04-03 19:48:49,486] DEBUG [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Received 
CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header 
RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, 
clientId=consumer-test_group_id-1, correlationId=36, headerVersion=2): 
ConsumerGroupHeartbeatResponseData(t     hrottleTimeMs=0, errorCode=0, 
errorMessage=null, memberId='HhILLGoPQ3i7Rt6IINJbRA', memberEpoch=2, 
heartbeatIntervalMs=5000, assignment=Assignment(topicPartitions=[])) 
(org.apache.kafka.clients.NetworkClient)

Which causes revocation

Updating assignment with local epoch 1
8223         Assigned partitions:                       []
8224         Current owned partitions:                  [test_topic-0, 
test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5]
8225         Added partitions (assigned - owned):       []
8226         Revoked partitions (owned - assigned):     [test_topic-0, 
test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5]
8227  (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code}
 

 

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch, consumer.log
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> 

Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-03 Thread via GitHub


OmniaGM commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035559842

   @fvaleri, @nizhikov @mimaison just rebased the pr and addressed the feedback 
will wait for this one to be merged before rebasing the other pr for 
KafkaConfig 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16389:
---
Attachment: consumer.log

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch, consumer.log
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
> num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])]
> {code}
> To reproduce, create a system test suite file named 
> {{test_valid_assignment.yml}} with these contents:
> {code:yaml}
> failures:
>   - 
> 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}'
> {code}
> Then set the the {{TC_PATHS}} environment variable to include that test suite 
> file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16469) Metadata Schema Checker

2024-04-03 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16469:


 Summary: Metadata Schema Checker
 Key: KAFKA-16469
 URL: https://issues.apache.org/jira/browse/KAFKA-16469
 Project: Kafka
  Issue Type: New Feature
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550393556


##
clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java:
##
@@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable {
  * @return Whether this is a batch containing control records
  */
 boolean isControlBatch();
+
+/**
+ * iterate all records to find the offset of max timestamp.
+ * noted:
+ * 1) that the earliest offset will return if there are multi records 
having same (max) timestamp
+ * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to 
{@link RecordBatch#MAGIC_VALUE_V0}

Review Comment:
   return => returns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-04-03 Thread via GitHub


kirktrue closed pull request #15650: KAFKA-16200: Enforce that RequestManager 
implementations respect user-provided timeout
URL: https://github.com/apache/kafka/pull/15650


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]

2024-04-03 Thread via GitHub


kirktrue opened a new pull request, #15650:
URL: https://github.com/apache/kafka/pull/15650

   The intention of the CompletableApplicationEvent is for a Consumer to block 
waiting for the event to complete. The application thread will block for the 
timeout, but there is not yet a consistent manner in which events are timed out.
   
   Enforce at the request manager layer that timeouts are respected per the 
design in [KAFKA-15848](https://issues.apache.org/jira/browse/KAFKA-15848).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1550273635


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) {
 
 return newArgs.toArray(new String[0]);
 }
+
+private void retryUntilEqual(List expected, Supplier> 
outputSupplier) {
+try {
+TestUtils.waitForCondition(
+() -> expected.equals(outputSupplier.get()),
+"TopicOffsets did not match. Expected: " + 
expectedTestTopicOffsets() + ", but was: " + outputSupplier.get()

Review Comment:
   I have the similar error in another PR 
(https://github.com/apache/kafka/pull/15621) :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1550279550


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/pull/15585#issuecomment-2021526537) that 
made you introduce the while loop:
   
   > there is a race condition bug where the metadata is not updated but the 
heartbeat request is already created, but it lacks required info 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1550273635


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) {
 
 return newArgs.toArray(new String[0]);
 }
+
+private void retryUntilEqual(List expected, Supplier> 
outputSupplier) {
+try {
+TestUtils.waitForCondition(
+() -> expected.equals(outputSupplier.get()),
+"TopicOffsets did not match. Expected: " + 
expectedTestTopicOffsets() + ", but was: " + outputSupplier.get()

Review Comment:
   I have the same error in another PR 
(https://github.com/apache/kafka/pull/15621) :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550271336


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -56,11 +60,38 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-setOldMessageFormat = false
+version = RecordBatch.MAGIC_VALUE_V2
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testListMaxTimestampWithEmptyLog(quorum: String): Unit = {
+val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topicName)
+assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, 
maxTimestampOffset.offset())
+assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 
maxTimestampOffset.timestamp())
+  }
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk"))
+  def testListVersion0(quorum: String): Unit = {

Review Comment:
   @junrao this is the new test case for the version 0 that we should get `-1` 
if the magic value is 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270774


##
clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java:
##
@@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable {
  * @return Whether this is a batch containing control records
  */
 boolean isControlBatch();
+
+/**
+ * iterate all records to find the offset of max timestamp.
+ * noted:
+ * 1) that the earliest offset will return if there are multi records 
having same (max) timestamp
+ * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to 
{@link RecordBatch#MAGIC_VALUE_V0}
+ * @return offset of max timestamp
+ */
+default Optional offsetOfMaxTimestamp() {
+if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty();

Review Comment:
   @junrao the short-circuit is added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270383


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+latestTimestampSegment.log.batchesFrom(position.position).asScala

Review Comment:
   I have addressed the comment by 
https://github.com/apache/kafka/pull/15621/commits/4785371c54e2fc2c540895ffe2f94829449937e6



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2035323125

   > There were 69 test failures and quite a few of them related to ListOffset
   
   There is another PR (https://github.com/apache/kafka/pull/15489) encounters 
same error that listing offset return incorrect offset. I'm digging in it ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer

2024-04-03 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833704#comment-17833704
 ] 

Philip Nee commented on KAFKA-16389:


Hi [~kirktrue] Thanks for the initial investigation.  I think your approach 
makes sense but I do think we need to rewrite the verifiable_consumer.py's 
event handler.  As the states transition doesn't necessary match the behavior 
of the current consumer.  And I think that's why there's still some flakiness 
in the patch you submitted.  See my notes below:

 

I'm still occasionally getting errors like: "ducktape.errors.TimeoutError: 
expected valid assignments of 6 partitions when num_started 1: 
[('ducker@ducker11', [])]"

 

This seems to be caused by some weird reconciliation state.  For example: Here 
We can see consumer1 got assigned 6 partitions and then immediately gave up all 
of them.  It is unclear why onPartitionsRevoke is triggered. 

 
{code:java}
  1 node  
wait for member
idx  1 partiton assigned [{'topic': 'test_topic', 'partition': 0}, {'topic': 
'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, 
{'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 
4}, {'topic': 'test_topic', 'partition': 5}]
idx  1 partiton revoked [{'topic': 'test_topic', 'partition': 0}, {'topic': 
'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, 
{'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 
4}, {'topic': 'test_topic', 'partition': 5}]
node: ducker11
Current assignment: {: []}
idx  1 partiton assigned []
[WARNING - 2024-04-03 11:05:34,587 - service_registry - stop_all - lineno:53]: 
Error stopping service : 

[WARNING - 2024-04-03 11:06:09,128 - service_registry - clean_all - lineno:67]: 
Error cleaning service : 

[INFO:2024-04-03 11:06:09,134]: RunnerClient: 
kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=range:
 FAIL: TimeoutError("expected valid assignments of 6 partitions when 
num_started 1: [('ducker@ducker11', [])]")
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
    data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
583, in test_valid_assignment
    wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
consumer.current_assignment()),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
58, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when 
num_started 1: [('ducker@ducker11', [])] {code}
 

> consumer_test.py’s test_valid_assignment fails with new consumer
> 
>
> Key: KAFKA-16389
> URL: https://issues.apache.org/jira/browse/KAFKA-16389
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
> Attachments: KAFKA-16389.patch
>
>
> The following error is reported when running the {{test_valid_assignment}} 
> test from {{consumer_test.py}}:
>  {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
> data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 
> 584, in test_valid_assignment
> wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, 
> consumer.current_assignment()),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: expected valid assignments of 6 partitions w

Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-03 Thread via GitHub


dongnuo123 commented on code in PR #15593:
URL: https://github.com/apache/kafka/pull/15593#discussion_r1550242669


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##
@@ -1300,6 +1341,68 @@ public Map groupAssignment() {
 ));
 }
 
+/**
+ * Convert the current classic group to a consumer group.
+ * Add the records for the conversion.
+ *
+ * @param consumerGroup The converted consumer group.
+ * @param records   The list to which the new records are added.
+ *
+ * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+ */
+public void convertToConsumerGroup(
+ConsumerGroup consumerGroup,
+List records,
+TopicsImage topicsImage
+) throws GroupIdNotFoundException {
+consumerGroup.setGroupEpoch(generationId);
+consumerGroup.setTargetAssignmentEpoch(generationId);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+// SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+
+members.forEach((memberId, member) -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(memberId)
+.setMemberEpoch(generationId)
+.setPreviousMemberEpoch(generationId)
+.setInstanceId(member.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+.setClientId(member.clientId())
+.setClientHost(member.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+.build();
+consumerGroup.updateMember(newMember);
+
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
newMember));
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
newMember));
+records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
memberId, partitions));
+});

Review Comment:
   Need to schedule session timeouts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-04-03 Thread via GitHub


apoorvmittal10 commented on code in PR #15532:
URL: https://github.com/apache/kafka/pull/15532#discussion_r1550228079


##
build.gradle:
##
@@ -1435,10 +1454,10 @@ project(':clients') {
 implementation libs.opentelemetryProto
 
 // libraries which should be added as runtime dependencies in generated 
pom.xml should be defined here:
-shadow libs.zstd
-shadow libs.lz4
-shadow libs.snappy
-shadow libs.slf4jApi
+shadowed libs.zstd
+shadowed libs.lz4
+shadowed libs.snappy
+shadowed libs.slf4jApi

Review Comment:
   Hi @mimaison @showuon I have made the suggested change, just added version 
as well with the group and name. I have verified the pom, jar and manifest file 
they are correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-03 Thread via GitHub


OmniaGM commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035219431

   > @OmniaGM I think this is almost ready, but there are a couple of minor 
comments to address and some conflicts to fix.
   
   Will be addressing this soon after another pr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1550191029


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   All good then, just wanting to make sure we're not loosing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


gharris1727 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2035199919

   Hi @chia7712 @omkreddy I have delayed backporting either change to 3.6 as 
we're currently in an ongoing release. If you'd like me to backport it now, I 
can do that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on PR #15585:
URL: https://github.com/apache/kafka/pull/15585#issuecomment-2035194359

   Hey @Phuc-Hong-Tran, thanks for the updates! Left some comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]

2024-04-03 Thread via GitHub


OmniaGM commented on PR #15409:
URL: https://github.com/apache/kafka/pull/15409#issuecomment-2035191271

   > @gaurav-narula , the new added KRaft tests are failing in our test env. 
The logs are 
[here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing).
 Could you help check and fix them? Thanks.
   
   I believe this pr depends on https://github.com/apache/kafka/pull/15335 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1550178486


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, 
Optionalhttps://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466)
 it means that, with the latest metadata, we discovered new topics that made 
the subscription change. So only at that point we need to create the 
`SubscriptionChangeEvent` I would say (and yes, then we also call 
`requestUpdateForNewTopics`)
   
   So echoing my first comment on this thread, seems to me that we shouldn't 
rely on any metadata object/version check here as it could not be accurate 
(removing 
[this](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1757-L1761)),
 and we should just make sure that we send the `SubscriptionChangeEvent` only 
when we know that the subscription changed, which is inside the 
`updatePatternSubscription` 
[if](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1465).
   
   Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2035163301

   @gharris1727 @omkreddy 
   
   #15597 and this one have backported to 3.7. see 
https://github.com/apache/kafka/commit/d9674c6c9a99c0d8a9c32386b0cceaceac92969c 
and 
https://github.com/apache/kafka/commit/cfc97a137f19912313fc73776121d70b0e3f7a88
   
   They are backported to 3.5 also ( we don't need to push to 3.5, and it seems 
be a mistaken). see 
https://github.com/apache/kafka/commit/d29d21e6bec61994d30c487298bbb7c6eb6c1e41 
and 
https://github.com/apache/kafka/commit/046821905477a8dbd1dc7991eeb1c416b5622c81
   
   However, the branch 3.6 does not have those branches, and so @gharris1727 
Please do the backport for 3.6, thanks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config [kafka]

2024-04-03 Thread via GitHub


wcarlson5 merged PR #15629:
URL: https://github.com/apache/kafka/pull/15629


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config [kafka]

2024-04-03 Thread via GitHub


wcarlson5 merged PR #15628:
URL: https://github.com/apache/kafka/pull/15628


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16438: Update consumer_test.py’s static tests to support KIP-848’s group protocol config [kafka]

2024-04-03 Thread via GitHub


wcarlson5 merged PR #15627:
URL: https://github.com/apache/kafka/pull/15627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]

2024-04-03 Thread via GitHub


wcarlson5 merged PR #15626:
URL: https://github.com/apache/kafka/pull/15626


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-04-03 Thread via GitHub


florin-akermann commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-2035119703

   Hi @mjsax , thanks for the flag.
   Yes I'll push the necessary changes by the end of the week (Sunday).
   I hope that's ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1550094782


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   This appears in the PR because i didn't rebase correctly.  You've actually 
moved the test to here: 
https://github.com/apache/kafka/blob/21479a31bdff0e15cfe7ee0a4e509232ed064b41/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala#L261



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1550055646


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
 // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
 // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-val segmentsCopy = logSegments.asScala.toBuffer
-val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
-val latestTimestampAndOffset = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
-
-Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp,
-  latestTimestampAndOffset.offset,
-  latestEpochAsOptional(leaderEpochCache)))
+val latestTimestampSegment = 
logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar)
+// cache the timestamp and offset
+val maxTimestampSoFar = 
latestTimestampSegment.readMaxTimestampAndOffsetSoFar
+// lookup the position of batch to avoid extra I/O
+val position = 
latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset)
+val lpc = latestEpochAsOptional(leaderEpochCache)
+latestTimestampSegment.log.batchesFrom(position.position).asScala

Review Comment:
   In the case of magic=0, we will find latestTimestampSegment with 
NO_TIMESTAMP. If we go through the rest of the logic, it seems that we will 
return the first offset instead of -1. Perhaps we should short-circuit if 
latestTimestampSegment is NO_TIMESTAMP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-03 Thread via GitHub


fvaleri commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035032922

   @OmniaGM I think this is almost ready, but there are a couple of minor 
comments to address and some conflicts to fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


gharris1727 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034976891

   I'll handle the backports later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


omkreddy commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034952210

   lets also merge to 3.7 and 3.6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1549916567


##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   def killBroker(index: Int): Unit = {

Review Comment:
   maybe we should keep origin implementation since it expect to await shutdown.



##
core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java:
##
@@ -323,7 +324,7 @@ public void rollingBrokerRestart() {
 throw new IllegalStateException("Tried to restart brokers but 
the cluster has not been started!");
 }
 for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
-clusterReference.get().killBroker(i);
+clusterReference.get().killBroker(i, Duration.ofSeconds(5));

Review Comment:
   what is the purpose of this change?



##
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##
@@ -197,12 +198,11 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 verifyNonDaemonThreadsStatus()
   }
 
-  @Disabled
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("kraft"))
   def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = {

Review Comment:
   the shutdown with timeout is a kind of dirty shutdown so we should rename 
the test to `testDirtyShutdownWithKRaftControllerUnavailable`



##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
   }
 
   def killBroker(index: Int): Unit = {
-if (alive(index)) {
-  _brokers(index).shutdown()
-  _brokers(index).awaitShutdown()
+killBroker(index, Duration.ofSeconds(5))
+  }
+
+  def killBroker(index: Int, timeout: Duration): Unit = {

Review Comment:
   we need to document the difference of this variety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1549909706


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -623,9 +623,12 @@ class BrokerServer(
 }
   }
 
-  override def shutdown(): Unit = {
+  override def shutdown(): Unit = shutdown(TimeUnit.MINUTES.toMillis(5))

Review Comment:
   Yup, that makes sense. Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1549909351


##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -93,6 +93,7 @@ trait KafkaBroker extends Logging {
   def startup(): Unit
   def awaitShutdown(): Unit
   def shutdown(): Unit
+  def shutdown(timeoutMs: Long): Unit

Review Comment:
   Good suggestion. Applied



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
   }
 
+  /**
+   * Verifies that pattern subscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' 
or 'tblab1'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When a new topic 'tsomec' is added afterwards, it is expected that upon 
the next
+   * metadata refresh the consumer becomes subscribed to this new topic and 
all partitions
+   * of that topic are assigned to it.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPatternSubscription(quorum: String, groupProtocol: String): Unit = {
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+val topic1 = "tblablac" // matches subscribed pattern
+createTopic(topic1, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+val topic2 = "tblablak" // does not match subscribed pattern
+createTopic(topic2, 2, brokerCount)
+sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
+
+val topic3 = "tblab1" // does not match subscribed pattern
+createTopic(topic3, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
+
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment().size)
+
+val pattern = Pattern.compile("t.*c")
+consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+var assignment = Set(
+  new TopicPartition(topic, 0),
+  new TopicPartition(topic, 1),
+  new TopicPartition(topic1, 0),
+  new TopicPartition(topic1, 1))
+awaitAssignment(consumer, assignment)
+
+val topic4 = "tsomec" // matches subscribed pattern
+createTopic(topic4, 2, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
+sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
+
+assignment ++= Set(
+  new TopicPartition(topic4, 0),
+  new TopicPartition(topic4, 1))
+awaitAssignment(consumer, assignment)
+
+consumer.unsubscribe()
+assertEquals(0, consumer.assignment().size)
+  }
+
+  /**
+   * Verifies that a second call to pattern subscription succeeds and performs 
as expected.
+   * The initial subscription is to a pattern that matches two topics 'topic' 
and 'foo'.
+   * The second subscription is to a pattern that matches 'foo' and a new 
topic 'bar'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and 'foo' after
+   * the first subscription, and to all partitions of 'foo' and 'bar' after 
the second.
+   * The metadata refresh interval is intentionally increased to a large 
enough value to guarantee
+   * that it is the subscription call that triggers a metadata refresh, and 
not the timeout.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubsequentPatternSubscription(quorum: String, groupProtocol: 
String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"3")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords = numRecords, tp)
+
+// the first topic ('topic')  matches first subscription pattern only
+
+val fooTopic = "foo" // matches both subscription patterns
+createTopic(fooTopic, 1, brokerCount)
+sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
+
+assertEquals(0, consumer.assignment().size)
+
+val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match 
this
+consumer.subscribe(pattern1, new TestConsumerReassignmentListener)
+
+var assignment = Set(
+  new TopicPartition(topic, 0),
+  new TopicPartition(topic, 1),
+  new TopicPartition(fooTopic, 0))
+awaitAssignment(consumer, assignment)
+
+val barTopic = "bar" // matches the next subscription pattern
+createTopic(barTopic, 1, brokerCount)
+sendRecords(producer, numRecords = 1000,

Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2034772501

   Hey @philipnee, thanks for the updates, just one minor comment left above. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1549857291


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   Is this one being removed intentionally? the suggestion was only to move it 
to the `PlainTextConsumerCommit` file, where all tests related to committing 
offsets are now. Ok for me if you think it's not worth keeping, but just to 
make sure it's intentional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16070: move setReadOnly to Headers [kafka]

2024-04-03 Thread via GitHub


LiangliangSui closed pull request #15097: KAFKA-16070: move setReadOnly to 
Headers
URL: https://github.com/apache/kafka/pull/15097


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -177,6 +201,78 @@ class RaftManagerTest {
 assertFalse(fileLocked(lockPath))
   }
 
+  @Test
+  def testMigratingZkBrokerDeletesMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, 
metadataDir)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config
+)
+raftManager.shutdown()
+
+try {
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config)
+  
assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+} catch {
+  case err: Throwable => fail("Failed to delete metadata log", err)
+}
+assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+// Use this config to create the directory
+val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, 
logDir, metadataDir)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config1
+)
+raftManager.shutdown()
+
+val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, 
logDir, metadataDir)
+try {
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config2)
+  fail("Should have not deleted the metadata log")
+} catch {
+  case err: Throwable =>
+assertEquals("Not deleting metadata log dir since migrations are not 
enabled.", err.getMessage)
+
assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
+}
+assertTrue(Files.exists(metadataDir.get))
+  }
+
+  @Test
+  def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
+val logDir = Some(TestUtils.tempDir().toPath)
+val metadataDir = Some(TestUtils.tempDir().toPath)
+val nodeId = 1
+val config = createConfig(
+  Set(ProcessRole.BrokerRole),
+  nodeId,
+  logDir,
+  metadataDir
+)
+val raftManager = createRaftManager(
+  new TopicPartition("__cluster_metadata", 0),
+  config
+)
+raftManager.shutdown()
+
+try {

Review Comment:
   we can use `assertThrow` to simplify the code. for example:
   ```scala
   assertThrows(classOf[RuntimeException], () => 
KafkaRaftManager.maybeDeleteMetadataLogDir(config))
   assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0")))
   assertTrue(Files.exists(metadataDir.get))
   ```



##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +70,51 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Test if the configured metadata log dir is one of the data log dirs.
+   */
+  def hasDifferentLogDir(config: KafkaConfig): Boolean = {
+!config
+  .logDirs
+  .map(Paths.get(_).toAbsolutePath)
+  .contains(Paths.get(config.metadataLogDir).toAbsolutePath)
+  }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  throw new RuntimeException("Not deleting metadata log dir since this 
node is in KRaft mode.")
+} else if (!config.migrationEnabled) {
+  throw new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled.")
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val logDirName = 
UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
+  val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new 
File(config.metadataLogDir), logDirName)

Review Comment:
   `new File(config.metadataLogDir)` can be replaced by `metadataDir`



##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest {
 }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = 
MetadataVersion.IBP_3_8_IV0, serverProperties = Array(
+new ClusterConfigProperty(key = "inter.broker.listener.name", value = 
"EXTERNAL"),
+new ClusterConfigProperty(key = "listeners", value = 
"PLAINTEXT://localhos

Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on PR #15648:
URL: https://github.com/apache/kafka/pull/15648#issuecomment-2034709602

   I'm going to work on a ducktape test as well. Hopefully I can get that done 
today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549818474


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   I updated this to let maybeDeleteMetadataLogDir throw and fail startup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Ok, code has been updated to just delete the `__cluster_metadata-0` 
directory. I got confused by our naming 😅 
   
   metadataLogDir is actually the directory in which the metadata log (which is 
a directory) exists :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -69,6 +69,36 @@ object KafkaRaftManager {
 
 lock
   }
+
+  /**
+   * Obtain the file lock and delete the metadata log directory completely.
+   *
+   * This is only used by ZK brokers that are in pre-migration or hybrid mode 
of the ZK to KRaft migration.
+   * The rationale for deleting the metadata log in these cases is that it is 
safe to do on brokers and it
+   * it makes recovery from a failed migration much easier. See KAFKA-16463.
+   *
+   * @param config  The broker config
+   * @returnAn error wrapped as an Option, if an error occurred. None 
otherwise
+   */
+  def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = {
+// These constraints are enforced in KafkaServer, but repeating them here 
to guard against future callers
+if (config.processRoles.nonEmpty) {
+  Some(new RuntimeException("Not deleting metadata log dir since this node 
is in KRaft mode."))
+} else if (!config.migrationEnabled) {
+  Some(new RuntimeException("Not deleting metadata log dir since 
migrations are not enabled."))
+} else {
+  val metadataDir = new File(config.metadataLogDir)
+  val deletionLock = KafkaRaftManager.lockDataDir(metadataDir)
+  try {
+Utils.delete(metadataDir)

Review Comment:
   Ok, code has been updated to just delete the `__cluster_metadata-0` 
directory. I got confused by our naming.. metadataLogDir is actually the 
directory in which the metadata log (which is a directory) exists 😅 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549813667


##
core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala:
##
@@ -389,7 +389,12 @@ class NodeToControllerRequestThread(
   debug("Controller isn't cached, looking for local metadata changes")
   controllerInformation.node match {
 case Some(controllerNode) =>
-  info(s"Recorded new controller, from now on will use node 
$controllerNode")
+  val controllerType = if (controllerInformation.isZkController) {
+"ZK"
+  } else {
+"KRaft"
+  }
+  info(s"Recorded new $controllerType controller, from now on will use 
node $controllerNode")

Review Comment:
   Unrelated change, but helped when debugging the integration test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


mumrah commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549786977


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   My rationale here was that the deletion is not strictly required, but rather 
an optimization for the revert-to-ZK case. 
   
   I assume RaftManager would also fail if there was some underlying I/O 
problem, but failing here is probably okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent

2024-04-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16468:
-
Description: 
During the ZK to KRaft migration, the controller will send RPCs to the ZK 
brokers using the configured "control.plane.listener.name" or more commonly, 
the "inter.broker.listener.name". If a ZK broker did not register with this 
listener, we get a error at the time of sending the first RPC to a broker.

{code}
[2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: 
Unhandled error in SendRPCsToBrokersEvent 
(org.apache.kafka.server.fault.MockFaultHandler:44)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
EXTERNAL not found for broker 0
at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
at scala.Option.getOrElse(Option.scala:201)
at kafka.cluster.Broker.node(Broker.scala:93)
at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:261)
at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

At this point, the KRaft controller has already migrated the metadata. Recovery 
at this point is possible by restarting the brokers with the correct listener 
names, but we can catch this much sooner in the process.

When a ZK broker registers with the KRaft controller, we should reject the 
registration if the expected listener name is not present. This will prevent 
the migration from starting.

  was:
During the ZK to KRaft migration, the controller will send RPCs using the 
configured "control.plane.listener.name" or more commonly, the 
"inter.broker.listener.name". If a ZK broker did not register with this 
listener, we get a error at the time of sending the first RPC to a broker.

{code}
[2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: 
Unhandled error in SendRPCsToBrokersEvent 
(org.apache.kafka.server.fault.MockFaultHandler:44)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
EXTERNAL not found for broker 0
at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
at scala.Option.getOrElse(Option.scala:201)
at kafka.cluster.Broker.node(Broker.scala:93)
at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:261)
at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

At this point, the KRaft controller has already migrated the metadata. Recovery 
at this point is possible by restarting the brokers with the correct listener 
names, but we can catch this much sooner in the process.

When a ZK broker registers with the KRaft controller, we should reject the 
registration if the expected listener name is not present. This will prevent 
the migration from starting.


> Listener not found error in SendRPCsToBrokersEvent
> --

[jira] [Updated] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent

2024-04-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-16468:
-
Description: 
During the ZK to KRaft migration, the controller will send RPCs to the ZK 
brokers using the configured "control.plane.listener.name" or more commonly, 
the "inter.broker.listener.name". If a ZK broker did not register with this 
listener, we get a error at the time of sending the first RPC to a broker.

{code}
[2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: 
Unhandled error in SendRPCsToBrokersEvent 
(org.apache.kafka.server.fault.MockFaultHandler:44)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
EXTERNAL not found for broker 0
at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
at scala.Option.getOrElse(Option.scala:201)
at kafka.cluster.Broker.node(Broker.scala:93)
at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:261)
at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

This is pretty late to be detecting this mis-configuration. By this point, the 
KRaft controller is the active controller and has already migrated the 
metadata. Recovery is possible by restarting the brokers with the correct 
listener names, but we can catch this much sooner in the process.

When a ZK broker registers with the KRaft controller, we should reject the 
registration if the expected listener name is not present. This will prevent 
the migration from starting.

  was:
During the ZK to KRaft migration, the controller will send RPCs to the ZK 
brokers using the configured "control.plane.listener.name" or more commonly, 
the "inter.broker.listener.name". If a ZK broker did not register with this 
listener, we get a error at the time of sending the first RPC to a broker.

{code}
[2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: 
Unhandled error in SendRPCsToBrokersEvent 
(org.apache.kafka.server.fault.MockFaultHandler:44)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
EXTERNAL not found for broker 0
at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
at scala.Option.getOrElse(Option.scala:201)
at kafka.cluster.Broker.node(Broker.scala:93)
at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:261)
at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

At this point, the KRaft controller has already migrated the metadata. Recovery 
at this point is possible by restarting the brokers with the correct listener 
names, but we can catch this much sooner in the process.

When a ZK broker registers with the KRaft controller, we should reject the 
registration if the expected listener name is not present. This will prevent 
the migration from starting.


[jira] [Created] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent

2024-04-03 Thread David Arthur (Jira)
David Arthur created KAFKA-16468:


 Summary: Listener not found error in SendRPCsToBrokersEvent
 Key: KAFKA-16468
 URL: https://issues.apache.org/jira/browse/KAFKA-16468
 Project: Kafka
  Issue Type: Bug
  Components: controller, migration
Reporter: David Arthur
 Fix For: 3.8.0


During the ZK to KRaft migration, the controller will send RPCs using the 
configured "control.plane.listener.name" or more commonly, the 
"inter.broker.listener.name". If a ZK broker did not register with this 
listener, we get a error at the time of sending the first RPC to a broker.

{code}
[2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: 
Unhandled error in SendRPCsToBrokersEvent 
(org.apache.kafka.server.fault.MockFaultHandler:44)
kafka.common.BrokerEndPointNotAvailableException: End point with listener name 
EXTERNAL not found for broker 0
at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94)
at scala.Option.getOrElse(Option.scala:201)
at kafka.cluster.Broker.node(Broker.scala:93)
at 
kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122)
at 
kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98)
at scala.collection.immutable.Set$Set3.foreach(Set.scala:261)
at 
kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98)
at 
kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219)
at 
org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

At this point, the KRaft controller has already migrated the metadata. Recovery 
at this point is possible by restarting the brokers with the correct listener 
names, but we can catch this much sooner in the process.

When a ZK broker registers with the KRaft controller, we should reject the 
registration if the expected listener name is not present. This will prevent 
the migration from starting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on code in PR #12174:
URL: https://github.com/apache/kafka/pull/12174#discussion_r1549747480


##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -93,6 +93,7 @@ trait KafkaBroker extends Logging {
   def startup(): Unit
   def awaitShutdown(): Unit
   def shutdown(): Unit
+  def shutdown(timeoutMs: Long): Unit

Review Comment:
   How about using `Duration` instead of long type? Also, we can rename it from 
`timeoutMs` to `timeout`



##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -623,9 +623,12 @@ class BrokerServer(
 }
   }
 
-  override def shutdown(): Unit = {
+  override def shutdown(): Unit = shutdown(TimeUnit.MINUTES.toMillis(5))

Review Comment:
   How about adding default implementation to parent class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]

2024-04-03 Thread via GitHub


tinaselenge commented on code in PR #15377:
URL: https://github.com/apache/kafka/pull/15377#discussion_r1549736126


##
core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala:
##
@@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
 assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}")
 metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
   }
+
+  override def createAdminClient: Admin = {

Review Comment:
   This was necessary because admin client is created with different security 
configurations for SSL tests than SASL and Plain. We could remove this and 
override the createConfig() method instead but the override is needed.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable

2024-04-03 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833568#comment-17833568
 ] 

Igor Soarez commented on KAFKA-13907:
-

[~chia7712] I've re-opened and updated the PR

> Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
> --
>
> Key: KAFKA-13907
> URL: https://issues.apache.org/jira/browse/KAFKA-13907
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Assignee: Igor Soarez
>Priority: Major
>  Labels: newbie
>
> ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang 
> up waiting for controlled shutdown, there may be some bug related to it.
> since this bug can be reproduced locally, it won't be hard to investigated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]

2024-04-03 Thread via GitHub


soarez opened a new pull request, #12174:
URL: https://github.com/apache/kafka/pull/12174

   When a controlled shutdown is requested the broker tries to communicate the 
state change to the controller via a heartbeat request. [1]
   
   In this test, the controller is not available so the request will fail. The 
current timeout behavior in a heartbeat request is to just keep retrying — 
which generally makes sense, just not in the context of a controlled shutdown.
   
   When a heartbeat request times out, if we are in the middle of a controlled 
shutdown, we shouldn't just retry forever but rather just give up on trying to 
contact the controller and proceed with the controlled shutdown.
   
   [1] 
https://github.com/apache/kafka/blob/f2d6282668a31b9a554563338f9178e2bba2833f/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala#L217
   
   *Summary of testing strategy*
   The test no longer fails
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2024-04-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833566#comment-17833566
 ] 

Chia-Ping Tsai commented on KAFKA-6527:
---

loop the tests 300 times, and all pass. we can re-enable it [~gharris1727] FYI

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>  Labels: flakey, flaky-test
> Fix For: 3.8.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16413) Add tests for FileLock

2024-04-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16413.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Add tests for FileLock
> --
>
> Key: KAFKA-16413
> URL: https://issues.apache.org/jira/browse/KAFKA-16413
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> Ref: [https://github.com/apache/kafka/pull/15568#pullrequestreview-1950676267]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16413: add FileLockTest [kafka]

2024-04-03 Thread via GitHub


chia7712 merged PR #15624:
URL: https://github.com/apache/kafka/pull/15624


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16413: add FileLockTest [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15624:
URL: https://github.com/apache/kafka/pull/15624#issuecomment-2034576975

   ```
   ./gradlew cleanTest :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful --tests 
JmxToolTest.initializationError :connect:runtime:test --tests 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId
 :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
IdentityReplicationIntegrationTest.testReplicateFromLatest --tests 
MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs --tests 
MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault --tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs 
:core:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ZkMigrationIntegrationTest.testMigrateTopicDeletions --tests 
UserQuotaTest.testQuotaOverrideDelete --tests 
SocketServerTest.testControlPlaneTa
 kePrecedenceOverInterBrokerListenerAsPrivilegedListener --tests 
SocketServerTest.testZeroMaxConnectionsPerIp --tests 
SocketServerTest.testStagedListenerShutdownWhenConnectionQueueIsFull --tests 
SocketServerTest.testStagedListenerStartup --tests 
SocketServerTest.testControlPlaneAsPrivilegedListener --tests 
SocketServerTest.testInterBrokerListenerAsPrivilegedListener --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
SelectorTest.testConnectionsByClientMetric --tests 
Tls12SelectorTest.testConnectionsByClientMetric --tests 
SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests 
Tls12SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests 
SelectorTest.testMuteOnOOM --tests Tls12SelectorTest.testMuteOnOOM --tests 
Tls13SelectorTest.testConnectionsByClientMetric --tests 
Tls13SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests 
Tls13SelectorTest.tes
 tMuteOnOOM
   ```
   they pass on my local.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1549709937


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   If we didn't know if the future log was caught up or not, then I'd prefer 
(a), but at this point I can't conceive of a different scenario – other than a 
failure during replica promotion – that would cause the future log to be in the 
directory assigned in the metadata. So I agree that the two logs likely will be 
either caught up or very close. So I agree it makes more sense to do (b) - 
promote the future log and delete the main one.
   
   We can still run into trouble if the directory with the main replica is 
offline. At some point that will cause a crash if the directory ever comes back 
online. But there's nothing we can do about that here. Maybe future work could 
improve how the broker handles loading conflicting logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-03 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1549695928


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   Thanks for the feedback.
   
   For (2), we've couple of options. We can either:
   
   (a) ignore the future replica (say in dir2) if the main replica exists in an 
online log dir (say dir1) or,
   (b) promote the future replica (in dir2)  and remove the main replica (in 
dir1).
   
   (a) would result in ReplicaManager spawning a replicaAlterLogDir thread for 
the future replica and correcting the assignment to dir1, only for it to be 
changed back again to dir2 when the replicaAlterLogDir thread finishes its job. 
Refer 
https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2734
 and 
https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2745
   
   Since in these scenarios, the future replica is almost caught up with the 
main replica, I'm leaning towards option (b) to avoid more reassignments. 
Please let me know if you feel otherwise.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15639:
URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034397405

   ```
   ./gradlew cleanTest :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful 
:connect:runtime:test --tests 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
 :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers 
:trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:server:test --tests 
AssignmentsManagerTest.testRequeuesFailedAssignmentPropagations 
:connect:mirror:test --tests 
MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow
 --tests MirrorConnectorsIntegrationSSLTest.testReplicateFromLatest --tests 
IdentityReplicationIntegrationTest.testReplicateSourceDefault --tests 
MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault :core:test 
--tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
LogDirFailureTest.testIOExceptionDuringLog
 Roll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint
   ```
   all pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]

2024-04-03 Thread via GitHub


chia7712 merged PR #15639:
URL: https://github.com/apache/kafka/pull/15639


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-04-03 Thread via GitHub


OmniaGM commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1549552216


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -289,13 +289,17 @@ class BrokerMetadataPublisher(
 try {
   // Start log manager, which will perform (potentially lengthy)
   // recovery-from-unclean-shutdown if required.
-  logManager.startup(metadataCache.getAllTopics())
-
-  // Delete partition directories which we're not supposed to have. We have
-  // to do this before starting ReplicaManager, so that the stray replicas
-  // don't block creation of new ones with different IDs but the same 
names.
-  // See KAFKA-14616 for details.
-  logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics())
+  logManager.startup(
+metadataCache.getAllTopics(),
+isStray = (topicId, partition) => {
+  val tid = topicId.getOrElse {
+throw new RuntimeException(s"Partition $partition does not have a 
topic ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  Option(newImage.topics().getPartition(tid, partition.partition()))
+.exists(_.replicas.contains(brokerId))

Review Comment:
   You are right, the new `isStrayKraftReplica` is finding both cases but I 
guess I took Igor suggestion without giving it a second thought 
https://github.com/apache/kafka/pull/15335#discussion_r1512748010 will fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16457 Useless import class [kafka]

2024-04-03 Thread via GitHub


chia7712 merged PR #15646:
URL: https://github.com/apache/kafka/pull/15646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16467) Add README to docs folder

2024-04-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833498#comment-17833498
 ] 

Chia-Ping Tsai commented on KAFKA-16467:


Have you checked the option1? not sure whether it is valid now.

option2 seems be more simple than option1 to me

> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16430) The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end.

2024-04-03 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833497#comment-17833497
 ] 

Chia-Ping Tsai commented on KAFKA-16430:


{quote}
what you mean?  Is the newer kafka script referring to the use of the new 
version of the kafka-consumer-group.sh client script? But now there is a 
problem with the kafka broker server side.
{quote}

Please ignore my previous comment :(

{quote}
At the same time, I found through the top command that the 
"group-metadata-manager-0" thread was constantly consuming 100% of the CPU 
resources. This loop could not be broken, resulting in the inability to consume 
topic partition data on that node. At this point, I suspected that the issue 
may be related to the __consumer_offsets partition data file loaded by this 
thread.
{quote}

Could you share more details? for example, the thread dump or hot path you 
observed

{quote}
We encountered this issue in our production environment using Kafka versions 
2.2.1 and 2.4.0, and I believe it may also affect other versions.
{quote}

As kafka 2.x is EOL, is it possible that your team use kafak 3.x to reproduce 
the issue?

> The group-metadata-manager thread is always in a loading state and occupies 
> one CPU, unable to end.
> ---
>
> Key: KAFKA-16430
> URL: https://issues.apache.org/jira/browse/KAFKA-16430
> Project: Kafka
>  Issue Type: Bug
>  Components: group-coordinator
>Affects Versions: 2.4.0
>Reporter: Gao Fei
>Priority: Blocker
>
> I deployed three broker instances and suddenly found that the client was 
> unable to consume data from certain topic partitions. I first tried to log in 
> to the broker corresponding to the group and used the following command to 
> view the consumer group:
> {code:java}
> ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe 
> --group mygroup{code}
> and found the following error:
> {code:java}
> Error: Executing consumer group command failed due to 
> org.apache.kafka.common.errors.CoodinatorLoadInProgressException: The 
> coodinator is loading and hence can't process requests.{code}
> I then discovered that the broker may be stuck in a loop, which is constantly 
> in a loading state. At the same time, I found through the top command that 
> the "group-metadata-manager-0" thread was constantly consuming 100% of the 
> CPU resources. This loop could not be broken, resulting in the inability to 
> consume topic partition data on that node. At this point, I suspected that 
> the issue may be related to the __consumer_offsets partition data file loaded 
> by this thread.
> Finally, after restarting the broker instance, everything was back to normal. 
> It's very strange that if there was an issue with the __consumer_offsets 
> partition data file, the broker should have failed to start. Why was it able 
> to automatically recover after a restart? And why did this continuous loop 
> loading of the __consumer_offsets partition data occur?
> We encountered this issue in our production environment using Kafka versions 
> 2.2.1 and 2.4.0, and I believe it may also affect other versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]

2024-04-03 Thread via GitHub


chia7712 commented on PR #15621:
URL: https://github.com/apache/kafka/pull/15621#issuecomment-2034230452

   the failed tests pass on my local.
   ```
   ./gradlew cleanTest :streams:test --tests 
SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart 
:storage:test --tests 
TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout 
:metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers 
:trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated 
:connect:mirror:test --tests 
DedicatedMirrorIntegrationTest.testMultiNodeCluster --tests 
MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault --tests 
MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithFrequentOffsetSyncs
 --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs 
:core:test --tests 
ListOffsetsIntegrationTest.testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer
 --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInOneBatch 
--tests 
ListOffsetsIntegrationTest.testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer
 --tests ListOffsetsIntegrationTest.test
 ThreeCompressedRecordsInOneBatch --tests 
ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch --tests 
ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInSeparateBatch --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ConsumerBounceTest.testCloseDuringRebalance --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Kafka capabilities

2024-04-03 Thread Kafka Life
Hi Kafka users

Does any one have a document or ppt that showcases the capabilities of
Kafka along with any cost management capability?
i have a customer who is still using IBM MQM and rabbit MQ. I want the
client to consider kafka for messaging and data streaming. I wanted to seek
your expert help if you have any document or ppt i can propose it as an
example. could you pls help.

thanks and regards
KrisG


Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]

2024-04-03 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-2034128217

   @divijvaidya can you also have a look at this once 
https://lists.apache.org/thread/f3yj7o5nfskz1onr59kmodm73kvtsktk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12399: Deprecate Log4J Appender [kafka]

2024-04-03 Thread via GitHub


mimaison commented on PR #10244:
URL: https://github.com/apache/kafka/pull/10244#issuecomment-2034028295

   Yes ideally we need the deprecation to go in 3.8 so we can delete the 
appender in 4.0. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #15648:
URL: https://github.com/apache/kafka/pull/15648#discussion_r1549319336


##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -420,6 +420,12 @@ class KafkaServer(
 isZkBroker = true,
 logManager.directoryIdsSet)
 
+  // For ZK brokers in migration mode, always delete the metadata 
partition on startup.
+  KafkaRaftManager.maybeDeleteMetadataLogDir(config) match {
+case Some(err) => logger.error("Could not delete local metadata 
log dir. This is non-fatal, so continuing with startup.", err)

Review Comment:
   Should this really be non-fatal? What's the thinking behind this decision?
   If there is an IO failure on the metadata log dir the broker should not 
continue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]

2024-04-03 Thread via GitHub


showuon commented on PR #15409:
URL: https://github.com/apache/kafka/pull/15409#issuecomment-2034010084

   @gaurav-narula , the new added KRaft tests are failing in our test env. The 
logs are 
[here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing).
 Could you help check and fix them? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-04-03 Thread via GitHub


showuon commented on code in PR #15532:
URL: https://github.com/apache/kafka/pull/15532#discussion_r1549312174


##
build.gradle:
##
@@ -1435,10 +1454,10 @@ project(':clients') {
 implementation libs.opentelemetryProto
 
 // libraries which should be added as runtime dependencies in generated 
pom.xml should be defined here:
-shadow libs.zstd
-shadow libs.lz4
-shadow libs.snappy
-shadow libs.slf4jApi
+shadowed libs.zstd
+shadowed libs.lz4
+shadowed libs.snappy
+shadowed libs.slf4jApi

Review Comment:
   @apoorvmittal10 , is there any update about the suggestion from @mimaison ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org