[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833816#comment-17833816 ] Arpit Goyal commented on KAFKA-12506: - [~kebab-mai-haddi] You can reassign this ticket back , if you want to work on this. [~ableegoldman] Can you help me assign some newbie tickets which would help me to start on kafka streams. > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Test minor gh review action - this is a test PR, do not review [kafka]
github-actions[bot] commented on PR #15123: URL: https://github.com/apache/kafka/pull/15123#issuecomment-2036109173 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16081 Limit number of ssl connections in brokers [kafka]
github-actions[bot] commented on PR #15126: URL: https://github.com/apache/kafka/pull/15126#issuecomment-2036109157 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16211) Inconsistent config values in CreateTopicsResult and DescribeConfigsResult
[ https://issues.apache.org/jira/browse/KAFKA-16211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833809#comment-17833809 ] Dung Ha commented on KAFKA-16211: - My guess from reading the code is that describeTopics() will send the request to a specified broker if the config resource is broker (ConfigResouce.Type.BROKER). In the case that the config resource is topic (ConfigResouce.Type.TOPIC), then a broker will be assigned using LeastLoadBrokerOrActiveKController() (in KafkaAdminClient), which in this situation will assign the "least loaded" broker. I have tested this and, indeed, each time I use describeConfigs() with the ConfigResource's type being "TOPIC", a different broker's static configuration may be returned. My question is: Is it supposed to be the way describeConfigs() should be used with the configResource's type being ConfigResouce.Type.TOPIC? Or even, are we supposed to use describeConfig() with configResource's type being ConfigResouce.Type.TOPIC instead of strictly with ConfigResouce.Type.BROKER? > Inconsistent config values in CreateTopicsResult and DescribeConfigsResult > -- > > Key: KAFKA-16211 > URL: https://issues.apache.org/jira/browse/KAFKA-16211 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Gantigmaa Selenge >Assignee: Dung Ha >Priority: Minor > > When creating a topic in KRaft cluster, a config value returned in > CreateTopicsResult is different than what you get from describe topic > configs, if the config was set in broker.properties or controller.properties > or in both but with different values. > > For example, start a broker with `segment.bytes` set to 573741824 in the > properties file and then create a topic, the CreateTopicsResult contains: > ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, > isSensitive=false, isReadOnly=false, synonyms=[], type=INT, > documentation=null) > because the controller was started without setting this config. > However when you describe configurations for the same topic, the config value > set by the broker is returned: > Create topic configsConfigEntry(name=segment.bytes, value=573741824, > source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, > synonyms=[], type=null, documentation=null) > > Vice versa, if the controller is started with this config set to a different > value, the create topic request returns the value set by the controller and > then when you describe the config for the same topic, you get the value set > by the broker. This makes it confusing to understand which value being is > used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]
showuon commented on PR #15409: URL: https://github.com/apache/kafka/pull/15409#issuecomment-2036106373 > > @gaurav-narula , the new added KRaft tests are failing in our test env. The logs are [here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing). Could you help check and fix them? Thanks. > > I believe this pr depends on #15335 OK, let's wait until #15335 merged and then check again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833804#comment-17833804 ] Aviral Srivastava commented on KAFKA-12506: --- [~ableegoldman] , I want to work on this. I am sorry this was abandoned. I am back. Thank you! > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833758#comment-17833758 ] A. Sophie Blee-Goldman commented on KAFKA-12506: [~goyarpit] yes, it seems this ticket has been abandoned. Feel free to pick it up (and let me know if you have any questions) [~kebab-mai-haddi] if you still want to work on this, just let me know, I'm sure there are multiple improvements that could be made here in parallel > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12506: -- Assignee: Arpit Goyal (was: Aviral Srivastava) > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Sub-task > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] KAFKA-12500 > See the umbrella ticket for the list of improvements we need to make this a > more effective integration test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16458) Add contains method in KeyValue store interface
[ https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833749#comment-17833749 ] A. Sophie Blee-Goldman commented on KAFKA-16458: This could be nice to have, but unfortunately given the overly complex way that state stores are implemented and the multi-layer hierarchy, something that should be simple – ie just adding a basic API to the StateStore interface – actually ends up being a massive amount of work. We do plan to overhaul the state store hierarchy at some point in order to simplify the codebase, both for maintenance and new features, although there's no clear roadmap or promise for this to happen anytime soon. That said, I would personally suggest we hold off on adding any new APIs that don't add strictly-new functionality until after we've simplified the state store implementation. Of course, if this is something you really want, you're always free to kick off a KIP discussion whenever. Just wanted to provide some context and warn that this would not be as straightforward as it might seem to actually implement. To your final question: I do think in some sense the reality is that yes, this API is not offered on purpose, in order to keep the interface as simple as possible. But this in itself would be less of a concern if the state store hierarchy was not such a hassle to expand and maintain, which is why I think the community would be open to it after we can get around to cleaning up the store implementation. > Add contains method in KeyValue store interface > --- > > Key: KAFKA-16458 > URL: https://issues.apache.org/jira/browse/KAFKA-16458 > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Ayoub Omari >Priority: Minor > Labels: needs-kip > > In some stream processors, we sometimes just want to check if a key exists in > the state store or not. > > I find calling .get() and checking if the return value is null a little bit > verbose > {code:java} > if (store.get(key) != null) { > }{code} > > But I am not sure if it is on purpose that we would like to keep the store > interface simple. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833744#comment-17833744 ] Philip Nee commented on KAFKA-16389: [^consumer.log] It seems like the consumer receives empty topicPartitions after the assignment. One suspicious thing that I see is that there's no send in between the successive Receives. I wonder if this is a race condition: First: Assignments received {code:java} 7997 [2024-04-03 19:48:49,445] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Received CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_group_id-1, correlationId=35, headerVersion=2): ConsumerGroupHeartbeatResponseData(t hrottleTimeMs=0, errorCode=0, errorMessage=null, memberId='pBC-jWhKQ7yr0y9MXysT2g', memberEpoch=1, heartbeatIntervalMs=5000, assignment=Assignment(topicPartitions=[TopicPartitions(topicId=TcUeldqLQae7xsWQo2WjPA, partitions=[0, 1, 2, 3, 4, 5])])) (org.apache.kafka.clients.NetworkClient) 7999 [2024-04-03 19:48:49,450] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Member pBC-jWhKQ7yr0y9MXysT2g with epoch 1 transitioned from JOINING to RECONCILING. (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl){code} Assignments completed: {code:java} 8005 [2024-04-03 19:48:49,454] INFO [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Updating assignment with local epoch 0 8006 Assigned partitions: [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8007 Current owned partitions: [] 8008 Added partitions (assigned - owned): [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8009 Revoked partitions (owned - assigned): [] 8010 (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code} Then receive another heartbeat: {code:java} 8021 [2024-04-03 19:48:49,486] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Received CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_group_id-1, correlationId=36, headerVersion=2): ConsumerGroupHeartbeatResponseData(t hrottleTimeMs=0, errorCode=0, errorMessage=null, memberId='HhILLGoPQ3i7Rt6IINJbRA', memberEpoch=2, heartbeatIntervalMs=5000, assignment=Assignment(topicPartitions=[])) (org.apache.kafka.clients.NetworkClient) Which causes revocation Updating assignment with local epoch 1 8223 Assigned partitions: [] 8224 Current owned partitions: [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8225 Added partitions (assigned - owned): [] 8226 Revoked partitions (owned - assigned): [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8227 (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code} > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception >
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035559842 @fvaleri, @nizhikov @mimaison just rebased the pr and addressed the feedback will wait for this one to be merged before rebasing the other pr for KafkaConfig -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16389: --- Attachment: consumer.log > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16469) Metadata Schema Checker
Colin McCabe created KAFKA-16469: Summary: Metadata Schema Checker Key: KAFKA-16469 URL: https://issues.apache.org/jira/browse/KAFKA-16469 Project: Kafka Issue Type: New Feature Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550393556 ## clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java: ## @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + +/** + * iterate all records to find the offset of max timestamp. + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} Review Comment: return => returns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue closed pull request #15650: KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout URL: https://github.com/apache/kafka/pull/15650 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16200: Enforce that RequestManager implementations respect user-provided timeout [kafka]
kirktrue opened a new pull request, #15650: URL: https://github.com/apache/kafka/pull/15650 The intention of the CompletableApplicationEvent is for a Consumer to block waiting for the event to complete. The application thread will block for the timeout, but there is not yet a consistent manner in which events are timed out. Enforce at the request manager layer that timeouts are respected per the design in [KAFKA-15848](https://issues.apache.org/jira/browse/KAFKA-15848). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1550273635 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: I have the similar error in another PR (https://github.com/apache/kafka/pull/15621) :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1550279550 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/pull/15585#issuecomment-2021526537) that made you introduce the while loop: > there is a race condition bug where the metadata is not updated but the heartbeat request is already created, but it lacks required info -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1550273635 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -391,4 +420,15 @@ private String[] addBootstrapServer(String... args) { return newArgs.toArray(new String[0]); } + +private void retryUntilEqual(List expected, Supplier> outputSupplier) { +try { +TestUtils.waitForCondition( +() -> expected.equals(outputSupplier.get()), +"TopicOffsets did not match. Expected: " + expectedTestTopicOffsets() + ", but was: " + outputSupplier.get() Review Comment: I have the same error in another PR (https://github.com/apache/kafka/pull/15621) :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550271336 ## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ## @@ -56,11 +60,38 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { @AfterEach override def tearDown(): Unit = { -setOldMessageFormat = false +version = RecordBatch.MAGIC_VALUE_V2 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") super.tearDown() } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testListMaxTimestampWithEmptyLog(quorum: String): Unit = { +val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topicName) +assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset()) +assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp()) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk")) + def testListVersion0(quorum: String): Unit = { Review Comment: @junrao this is the new test case for the version 0 that we should get `-1` if the magic value is 0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270774 ## clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java: ## @@ -245,4 +246,23 @@ public interface RecordBatch extends Iterable { * @return Whether this is a batch containing control records */ boolean isControlBatch(); + +/** + * iterate all records to find the offset of max timestamp. + * noted: + * 1) that the earliest offset will return if there are multi records having same (max) timestamp + * 2) it always return -1 if the {@link RecordBatch#magic()} is equal to {@link RecordBatch#MAGIC_VALUE_V0} + * @return offset of max timestamp + */ +default Optional offsetOfMaxTimestamp() { +if (magic() == RecordBatch.MAGIC_VALUE_V0) return Optional.empty(); Review Comment: @junrao the short-circuit is added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550270383 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +latestTimestampSegment.log.batchesFrom(position.position).asScala Review Comment: I have addressed the comment by https://github.com/apache/kafka/pull/15621/commits/4785371c54e2fc2c540895ffe2f94829449937e6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2035323125 > There were 69 test failures and quite a few of them related to ListOffset There is another PR (https://github.com/apache/kafka/pull/15489) encounters same error that listing offset return incorrect offset. I'm digging in it ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833704#comment-17833704 ] Philip Nee commented on KAFKA-16389: Hi [~kirktrue] Thanks for the initial investigation. I think your approach makes sense but I do think we need to rewrite the verifiable_consumer.py's event handler. As the states transition doesn't necessary match the behavior of the current consumer. And I think that's why there's still some flakiness in the patch you submitted. See my notes below: I'm still occasionally getting errors like: "ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])]" This seems to be caused by some weird reconciliation state. For example: Here We can see consumer1 got assigned 6 partitions and then immediately gave up all of them. It is unclear why onPartitionsRevoke is triggered. {code:java} 1 node wait for member idx 1 partiton assigned [{'topic': 'test_topic', 'partition': 0}, {'topic': 'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, {'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 4}, {'topic': 'test_topic', 'partition': 5}] idx 1 partiton revoked [{'topic': 'test_topic', 'partition': 0}, {'topic': 'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, {'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 4}, {'topic': 'test_topic', 'partition': 5}] node: ducker11 Current assignment: {: []} idx 1 partiton assigned [] [WARNING - 2024-04-03 11:05:34,587 - service_registry - stop_all - lineno:53]: Error stopping service : [WARNING - 2024-04-03 11:06:09,128 - service_registry - clean_all - lineno:67]: Error cleaning service : [INFO:2024-04-03 11:06:09,134]: RunnerClient: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=range: FAIL: TimeoutError("expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])]") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 583, in test_valid_assignment wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()), File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])] {code} > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions w
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15593: URL: https://github.com/apache/kafka/pull/15593#discussion_r1550242669 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ## @@ -1300,6 +1341,68 @@ public Map groupAssignment() { )); } +/** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ +public void convertToConsumerGroup( +ConsumerGroup consumerGroup, +List records, +TopicsImage topicsImage +) throws GroupIdNotFoundException { +consumerGroup.setGroupEpoch(generationId); +consumerGroup.setTargetAssignmentEpoch(generationId); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); +// SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + +members.forEach((memberId, member) -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); + +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(memberId) +.setMemberEpoch(generationId) +.setPreviousMemberEpoch(generationId) +.setInstanceId(member.groupInstanceId().orElse(null)) +.setRackId(subscription.rackId().orElse(null)) +.setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) +.setClientId(member.clientId()) +.setClientHost(member.clientHost()) +.setSubscribedTopicNames(subscription.topics()) +.setAssignedPartitions(partitions) +.build(); +consumerGroup.updateMember(newMember); + +records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); +records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); +records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), memberId, partitions)); +}); Review Comment: Need to schedule session timeouts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]
apoorvmittal10 commented on code in PR #15532: URL: https://github.com/apache/kafka/pull/15532#discussion_r1550228079 ## build.gradle: ## @@ -1435,10 +1454,10 @@ project(':clients') { implementation libs.opentelemetryProto // libraries which should be added as runtime dependencies in generated pom.xml should be defined here: -shadow libs.zstd -shadow libs.lz4 -shadow libs.snappy -shadow libs.slf4jApi +shadowed libs.zstd +shadowed libs.lz4 +shadowed libs.snappy +shadowed libs.slf4jApi Review Comment: Hi @mimaison @showuon I have made the suggested change, just added version as well with the group and name. I have verified the pom, jar and manifest file they are correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035219431 > @OmniaGM I think this is almost ready, but there are a couple of minor comments to address and some conflicts to fix. Will be addressing this soon after another pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1550191029 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: All good then, just wanting to make sure we're not loosing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
gharris1727 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2035199919 Hi @chia7712 @omkreddy I have delayed backporting either change to 3.6 as we're currently in an ongoing release. If you'd like me to backport it now, I can do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on PR #15585: URL: https://github.com/apache/kafka/pull/15585#issuecomment-2035194359 Hey @Phuc-Hong-Tran, thanks for the updates! Left some comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]
OmniaGM commented on PR #15409: URL: https://github.com/apache/kafka/pull/15409#issuecomment-2035191271 > @gaurav-narula , the new added KRaft tests are failing in our test env. The logs are [here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing). Could you help check and fix them? Thanks. I believe this pr depends on https://github.com/apache/kafka/pull/15335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1550178486 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1750,8 +1753,14 @@ private void subscribeInternal(Pattern pattern, Optionalhttps://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1466) it means that, with the latest metadata, we discovered new topics that made the subscription change. So only at that point we need to create the `SubscriptionChangeEvent` I would say (and yes, then we also call `requestUpdateForNewTopics`) So echoing my first comment on this thread, seems to me that we shouldn't rely on any metadata object/version check here as it could not be accurate (removing [this](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1757-L1761)), and we should just make sure that we send the `SubscriptionChangeEvent` only when we know that the subscription changed, which is inside the `updatePatternSubscription` [if](https://github.com/apache/kafka/blob/cf2874a2db845daaed80808ce42b50fc05584fdb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1465). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
chia7712 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2035163301 @gharris1727 @omkreddy #15597 and this one have backported to 3.7. see https://github.com/apache/kafka/commit/d9674c6c9a99c0d8a9c32386b0cceaceac92969c and https://github.com/apache/kafka/commit/cfc97a137f19912313fc73776121d70b0e3f7a88 They are backported to 3.5 also ( we don't need to push to 3.5, and it seems be a mistaken). see https://github.com/apache/kafka/commit/d29d21e6bec61994d30c487298bbb7c6eb6c1e41 and https://github.com/apache/kafka/commit/046821905477a8dbd1dc7991eeb1c416b5622c81 However, the branch 3.6 does not have those branches, and so @gharris1727 Please do the backport for 3.6, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16439: Update replication_replica_failure_test.py to support KIP-848’s group protocol config [kafka]
wcarlson5 merged PR #15629: URL: https://github.com/apache/kafka/pull/15629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16440: Update security_test.py to support KIP-848’s group protocol config [kafka]
wcarlson5 merged PR #15628: URL: https://github.com/apache/kafka/pull/15628 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16438: Update consumer_test.py’s static tests to support KIP-848’s group protocol config [kafka]
wcarlson5 merged PR #15627: URL: https://github.com/apache/kafka/pull/15627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]
wcarlson5 merged PR #15626: URL: https://github.com/apache/kafka/pull/15626 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
florin-akermann commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-2035119703 Hi @mjsax , thanks for the flag. Yes I'll push the necessary changes by the end of the week (Sunday). I hope that's ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1550094782 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: This appears in the PR because i didn't rebase correctly. You've actually moved the test to here: https://github.com/apache/kafka/blob/21479a31bdff0e15cfe7ee0a4e509232ed064b41/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala#L261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1550055646 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1337,13 +1337,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides // constant time access while being safe to use with concurrent collections unlike `toArray`. -val segmentsCopy = logSegments.asScala.toBuffer -val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) -val latestTimestampAndOffset = latestTimestampSegment.readMaxTimestampAndOffsetSoFar - -Some(new TimestampAndOffset(latestTimestampAndOffset.timestamp, - latestTimestampAndOffset.offset, - latestEpochAsOptional(leaderEpochCache))) +val latestTimestampSegment = logSegments.asScala.toBuffer.maxBy[Long](_.maxTimestampSoFar) +// cache the timestamp and offset +val maxTimestampSoFar = latestTimestampSegment.readMaxTimestampAndOffsetSoFar +// lookup the position of batch to avoid extra I/O +val position = latestTimestampSegment.offsetIndex.lookup(maxTimestampSoFar.offset) +val lpc = latestEpochAsOptional(leaderEpochCache) +latestTimestampSegment.log.batchesFrom(position.position).asScala Review Comment: In the case of magic=0, we will find latestTimestampSegment with NO_TIMESTAMP. If we go through the rest of the logic, it seems that we will return the first offset instead of -1. Perhaps we should short-circuit if latestTimestampSegment is NO_TIMESTAMP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
fvaleri commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2035032922 @OmniaGM I think this is almost ready, but there are a couple of minor comments to address and some conflicts to fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
gharris1727 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034976891 I'll handle the backports later today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
omkreddy commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034952210 lets also merge to 3.7 and 3.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]
chia7712 commented on code in PR #12174: URL: https://github.com/apache/kafka/pull/12174#discussion_r1549916567 ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } def killBroker(index: Int): Unit = { Review Comment: maybe we should keep origin implementation since it expect to await shutdown. ## core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java: ## @@ -323,7 +324,7 @@ public void rollingBrokerRestart() { throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!"); } for (int i = 0; i < clusterReference.get().brokerCount(); i++) { -clusterReference.get().killBroker(i); +clusterReference.get().killBroker(i, Duration.ofSeconds(5)); Review Comment: what is the purpose of this change? ## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ## @@ -197,12 +198,11 @@ class ServerShutdownTest extends KafkaServerTestHarness { verifyNonDaemonThreadsStatus() } - @Disabled @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("kraft")) def testCleanShutdownWithKRaftControllerUnavailable(quorum: String): Unit = { Review Comment: the shutdown with timeout is a kind of dirty shutdown so we should rename the test to `testDirtyShutdownWithKRaftControllerUnavailable` ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -260,9 +274,12 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { } def killBroker(index: Int): Unit = { -if (alive(index)) { - _brokers(index).shutdown() - _brokers(index).awaitShutdown() +killBroker(index, Duration.ofSeconds(5)) + } + + def killBroker(index: Int, timeout: Duration): Unit = { Review Comment: we need to document the difference of this variety. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]
soarez commented on code in PR #12174: URL: https://github.com/apache/kafka/pull/12174#discussion_r1549909706 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -623,9 +623,12 @@ class BrokerServer( } } - override def shutdown(): Unit = { + override def shutdown(): Unit = shutdown(TimeUnit.MINUTES.toMillis(5)) Review Comment: Yup, that makes sense. Changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]
soarez commented on code in PR #12174: URL: https://github.com/apache/kafka/pull/12174#discussion_r1549909351 ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -93,6 +93,7 @@ trait KafkaBroker extends Logging { def startup(): Unit def awaitShutdown(): Unit def shutdown(): Unit + def shutdown(timeoutMs: Long): Unit Review Comment: Good suggestion. Applied -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15538: Client support for java regex based subscription [kafka]
lianetm commented on code in PR #15585: URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0, startingTimestamp = startingTimestamp) } + /** + * Verifies that pattern subscription performs as expected. + * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' or 'tblab1'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and + * 'tblablac' after the subscription when metadata is refreshed. + * When a new topic 'tsomec' is added afterwards, it is expected that upon the next + * metadata refresh the consumer becomes subscribed to this new topic and all partitions + * of that topic are assigned to it. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testPatternSubscription(quorum: String, groupProtocol: String): Unit = { +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords, tp) + +val topic1 = "tblablac" // matches subscribed pattern +createTopic(topic1, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) + +val topic2 = "tblablak" // does not match subscribed pattern +createTopic(topic2, 2, brokerCount) +sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) + +val topic3 = "tblab1" // does not match subscribed pattern +createTopic(topic3, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) + +val consumer = createConsumer() +assertEquals(0, consumer.assignment().size) + +val pattern = Pattern.compile("t.*c") +consumer.subscribe(pattern, new TestConsumerReassignmentListener) + +var assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) +awaitAssignment(consumer, assignment) + +val topic4 = "tsomec" // matches subscribed pattern +createTopic(topic4, 2, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) +sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) + +assignment ++= Set( + new TopicPartition(topic4, 0), + new TopicPartition(topic4, 1)) +awaitAssignment(consumer, assignment) + +consumer.unsubscribe() +assertEquals(0, consumer.assignment().size) + } + + /** + * Verifies that a second call to pattern subscription succeeds and performs as expected. + * The initial subscription is to a pattern that matches two topics 'topic' and 'foo'. + * The second subscription is to a pattern that matches 'foo' and a new topic 'bar'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and 'foo' after + * the first subscription, and to all partitions of 'foo' and 'bar' after the second. + * The metadata refresh interval is intentionally increased to a large enough value to guarantee + * that it is the subscription call that triggers a metadata refresh, and not the timeout. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSubsequentPatternSubscription(quorum: String, groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "3") +val consumer = createConsumer() + +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords = numRecords, tp) + +// the first topic ('topic') matches first subscription pattern only + +val fooTopic = "foo" // matches both subscription patterns +createTopic(fooTopic, 1, brokerCount) +sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0)) + +assertEquals(0, consumer.assignment().size) + +val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this +consumer.subscribe(pattern1, new TestConsumerReassignmentListener) + +var assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(fooTopic, 0)) +awaitAssignment(consumer, assignment) + +val barTopic = "bar" // matches the next subscription pattern +createTopic(barTopic, 1, brokerCount) +sendRecords(producer, numRecords = 1000,
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2034772501 Hey @philipnee, thanks for the updates, just one minor comment left above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lianetm commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1549857291 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest { @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = { Review Comment: Is this one being removed intentionally? the suggestion was only to move it to the `PlainTextConsumerCommit` file, where all tests related to committing offsets are now. Ok for me if you think it's not worth keeping, but just to make sure it's intentional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16070: move setReadOnly to Headers [kafka]
LiangliangSui closed pull request #15097: KAFKA-16070: move setReadOnly to Headers URL: https://github.com/apache/kafka/pull/15097 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
chia7712 commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -177,6 +201,78 @@ class RaftManagerTest { assertFalse(fileLocked(lockPath)) } + @Test + def testMigratingZkBrokerDeletesMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, metadataDir) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config +) +raftManager.shutdown() + +try { + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) +} catch { + case err: Throwable => fail("Failed to delete metadata log", err) +} +assertTrue(Files.exists(metadataDir.get)) + } + + @Test + def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +// Use this config to create the directory +val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, metadataDir) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config1 +) +raftManager.shutdown() + +val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, logDir, metadataDir) +try { + KafkaRaftManager.maybeDeleteMetadataLogDir(config2) + fail("Should have not deleted the metadata log") +} catch { + case err: Throwable => +assertEquals("Not deleting metadata log dir since migrations are not enabled.", err.getMessage) + assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) +} +assertTrue(Files.exists(metadataDir.get)) + } + + @Test + def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +val config = createConfig( + Set(ProcessRole.BrokerRole), + nodeId, + logDir, + metadataDir +) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config +) +raftManager.shutdown() + +try { Review Comment: we can use `assertThrow` to simplify the code. for example: ```scala assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config)) assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) assertTrue(Files.exists(metadataDir.get)) ``` ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +70,51 @@ object KafkaRaftManager { lock } + + /** + * Test if the configured metadata log dir is one of the data log dirs. + */ + def hasDifferentLogDir(config: KafkaConfig): Boolean = { +!config + .logDirs + .map(Paths.get(_).toAbsolutePath) + .contains(Paths.get(config.metadataLogDir).toAbsolutePath) + } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.") +} else if (!config.migrationEnabled) { + throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.") +} else { + val metadataDir = new File(config.metadataLogDir) + val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION) + val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName) Review Comment: `new File(config.metadataLogDir)` can be replaced by `metadataDir` ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( +new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), +new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhos
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on PR #15648: URL: https://github.com/apache/kafka/pull/15648#issuecomment-2034709602 I'm going to work on a ducktape test as well. Hopefully I can get that done today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549818474 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: I updated this to let maybeDeleteMetadataLogDir throw and fail startup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: Ok, code has been updated to just delete the `__cluster_metadata-0` directory. I got confused by our naming 😅 metadataLogDir is actually the directory in which the metadata log (which is a directory) exists :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: Ok, code has been updated to just delete the `__cluster_metadata-0` directory. I got confused by our naming.. metadataLogDir is actually the directory in which the metadata log (which is a directory) exists 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549813667 ## core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala: ## @@ -389,7 +389,12 @@ class NodeToControllerRequestThread( debug("Controller isn't cached, looking for local metadata changes") controllerInformation.node match { case Some(controllerNode) => - info(s"Recorded new controller, from now on will use node $controllerNode") + val controllerType = if (controllerInformation.isZkController) { +"ZK" + } else { +"KRaft" + } + info(s"Recorded new $controllerType controller, from now on will use node $controllerNode") Review Comment: Unrelated change, but helped when debugging the integration test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549786977 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: My rationale here was that the deletion is not strictly required, but rather an optimization for the revert-to-ZK case. I assume RaftManager would also fail if there was some underlying I/O problem, but failing here is probably okay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent
[ https://issues.apache.org/jira/browse/KAFKA-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16468: - Description: During the ZK to KRaft migration, the controller will send RPCs to the ZK brokers using the configured "control.plane.listener.name" or more commonly, the "inter.broker.listener.name". If a ZK broker did not register with this listener, we get a error at the time of sending the first RPC to a broker. {code} [2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: Unhandled error in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.MockFaultHandler:44) kafka.common.BrokerEndPointNotAvailableException: End point with listener name EXTERNAL not found for broker 0 at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94) at scala.Option.getOrElse(Option.scala:201) at kafka.cluster.Broker.node(Broker.scala:93) at kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122) at kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98) at scala.collection.immutable.Set$Set3.foreach(Set.scala:261) at kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219) at org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base/java.lang.Thread.run(Thread.java:833) {code} At this point, the KRaft controller has already migrated the metadata. Recovery at this point is possible by restarting the brokers with the correct listener names, but we can catch this much sooner in the process. When a ZK broker registers with the KRaft controller, we should reject the registration if the expected listener name is not present. This will prevent the migration from starting. was: During the ZK to KRaft migration, the controller will send RPCs using the configured "control.plane.listener.name" or more commonly, the "inter.broker.listener.name". If a ZK broker did not register with this listener, we get a error at the time of sending the first RPC to a broker. {code} [2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: Unhandled error in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.MockFaultHandler:44) kafka.common.BrokerEndPointNotAvailableException: End point with listener name EXTERNAL not found for broker 0 at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94) at scala.Option.getOrElse(Option.scala:201) at kafka.cluster.Broker.node(Broker.scala:93) at kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122) at kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98) at scala.collection.immutable.Set$Set3.foreach(Set.scala:261) at kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219) at org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base/java.lang.Thread.run(Thread.java:833) {code} At this point, the KRaft controller has already migrated the metadata. Recovery at this point is possible by restarting the brokers with the correct listener names, but we can catch this much sooner in the process. When a ZK broker registers with the KRaft controller, we should reject the registration if the expected listener name is not present. This will prevent the migration from starting. > Listener not found error in SendRPCsToBrokersEvent > --
[jira] [Updated] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent
[ https://issues.apache.org/jira/browse/KAFKA-16468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-16468: - Description: During the ZK to KRaft migration, the controller will send RPCs to the ZK brokers using the configured "control.plane.listener.name" or more commonly, the "inter.broker.listener.name". If a ZK broker did not register with this listener, we get a error at the time of sending the first RPC to a broker. {code} [2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: Unhandled error in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.MockFaultHandler:44) kafka.common.BrokerEndPointNotAvailableException: End point with listener name EXTERNAL not found for broker 0 at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94) at scala.Option.getOrElse(Option.scala:201) at kafka.cluster.Broker.node(Broker.scala:93) at kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122) at kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98) at scala.collection.immutable.Set$Set3.foreach(Set.scala:261) at kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219) at org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base/java.lang.Thread.run(Thread.java:833) {code} This is pretty late to be detecting this mis-configuration. By this point, the KRaft controller is the active controller and has already migrated the metadata. Recovery is possible by restarting the brokers with the correct listener names, but we can catch this much sooner in the process. When a ZK broker registers with the KRaft controller, we should reject the registration if the expected listener name is not present. This will prevent the migration from starting. was: During the ZK to KRaft migration, the controller will send RPCs to the ZK brokers using the configured "control.plane.listener.name" or more commonly, the "inter.broker.listener.name". If a ZK broker did not register with this listener, we get a error at the time of sending the first RPC to a broker. {code} [2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: Unhandled error in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.MockFaultHandler:44) kafka.common.BrokerEndPointNotAvailableException: End point with listener name EXTERNAL not found for broker 0 at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94) at scala.Option.getOrElse(Option.scala:201) at kafka.cluster.Broker.node(Broker.scala:93) at kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122) at kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98) at scala.collection.immutable.Set$Set3.foreach(Set.scala:261) at kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219) at org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base/java.lang.Thread.run(Thread.java:833) {code} At this point, the KRaft controller has already migrated the metadata. Recovery at this point is possible by restarting the brokers with the correct listener names, but we can catch this much sooner in the process. When a ZK broker registers with the KRaft controller, we should reject the registration if the expected listener name is not present. This will prevent the migration from starting.
[jira] [Created] (KAFKA-16468) Listener not found error in SendRPCsToBrokersEvent
David Arthur created KAFKA-16468: Summary: Listener not found error in SendRPCsToBrokersEvent Key: KAFKA-16468 URL: https://issues.apache.org/jira/browse/KAFKA-16468 Project: Kafka Issue Type: Bug Components: controller, migration Reporter: David Arthur Fix For: 3.8.0 During the ZK to KRaft migration, the controller will send RPCs using the configured "control.plane.listener.name" or more commonly, the "inter.broker.listener.name". If a ZK broker did not register with this listener, we get a error at the time of sending the first RPC to a broker. {code} [2024-04-03 09:28:59,043] ERROR Encountered nonFatalFaultHandler fault: Unhandled error in SendRPCsToBrokersEvent (org.apache.kafka.server.fault.MockFaultHandler:44) kafka.common.BrokerEndPointNotAvailableException: End point with listener name EXTERNAL not found for broker 0 at kafka.cluster.Broker.$anonfun$node$1(Broker.scala:94) at scala.Option.getOrElse(Option.scala:201) at kafka.cluster.Broker.node(Broker.scala:93) at kafka.controller.ControllerChannelManager.addNewBroker(ControllerChannelManager.scala:122) at kafka.controller.ControllerChannelManager.addBroker(ControllerChannelManager.scala:105) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.$anonfun$publishMetadata$2$adapted(MigrationPropagator.scala:98) at scala.collection.immutable.Set$Set3.foreach(Set.scala:261) at kafka.migration.MigrationPropagator.publishMetadata(MigrationPropagator.scala:98) at kafka.migration.MigrationPropagator.sendRPCsToBrokersFromMetadataImage(MigrationPropagator.scala:219) at org.apache.kafka.metadata.migration.KRaftMigrationDriver$SendRPCsToBrokersEvent.run(KRaftMigrationDriver.java:777) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base/java.lang.Thread.run(Thread.java:833) {code} At this point, the KRaft controller has already migrated the metadata. Recovery at this point is possible by restarting the brokers with the correct listener names, but we can catch this much sooner in the process. When a ZK broker registers with the KRaft controller, we should reject the registration if the expected listener name is not present. This will prevent the migration from starting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]
chia7712 commented on code in PR #12174: URL: https://github.com/apache/kafka/pull/12174#discussion_r1549747480 ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -93,6 +93,7 @@ trait KafkaBroker extends Logging { def startup(): Unit def awaitShutdown(): Unit def shutdown(): Unit + def shutdown(timeoutMs: Long): Unit Review Comment: How about using `Duration` instead of long type? Also, we can rename it from `timeoutMs` to `timeout` ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -623,9 +623,12 @@ class BrokerServer( } } - override def shutdown(): Unit = { + override def shutdown(): Unit = shutdown(TimeUnit.MINUTES.toMillis(5)) Review Comment: How about adding default implementation to parent class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Refactor SSL/SASL admin integration tests to not use a custom authorizer [kafka]
tinaselenge commented on code in PR #15377: URL: https://github.com/apache/kafka/pull/15377#discussion_r1549736126 ## core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala: ## @@ -259,4 +275,22 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") metrics.map(_.asInstanceOf[Gauge[Int]].value).sum } + + override def createAdminClient: Admin = { Review Comment: This was necessary because admin client is created with different security configurations for SSL tests than SASL and Plain. We could remove this and override the createConfig() method instead but the override is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13907) Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable
[ https://issues.apache.org/jira/browse/KAFKA-13907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833568#comment-17833568 ] Igor Soarez commented on KAFKA-13907: - [~chia7712] I've re-opened and updated the PR > Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable > -- > > Key: KAFKA-13907 > URL: https://issues.apache.org/jira/browse/KAFKA-13907 > Project: Kafka > Issue Type: Bug >Reporter: Deng Ziming >Assignee: Igor Soarez >Priority: Major > Labels: newbie > > ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable will hang > up waiting for controlled shutdown, there may be some bug related to it. > since this bug can be reproduced locally, it won't be hard to investigated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-13907: Fix hanging ServerShutdownTest.testCleanShutdownWithKRaftControllerUnavailable [kafka]
soarez opened a new pull request, #12174: URL: https://github.com/apache/kafka/pull/12174 When a controlled shutdown is requested the broker tries to communicate the state change to the controller via a heartbeat request. [1] In this test, the controller is not available so the request will fail. The current timeout behavior in a heartbeat request is to just keep retrying — which generally makes sense, just not in the context of a controlled shutdown. When a heartbeat request times out, if we are in the middle of a controlled shutdown, we shouldn't just retry forever but rather just give up on trying to contact the controller and proceed with the controlled shutdown. [1] https://github.com/apache/kafka/blob/f2d6282668a31b9a554563338f9178e2bba2833f/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala#L217 *Summary of testing strategy* The test no longer fails ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
[ https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833566#comment-17833566 ] Chia-Ping Tsai commented on KAFKA-6527: --- loop the tests 300 times, and all pass. we can re-enable it [~gharris1727] FYI > Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig > > > Key: KAFKA-6527 > URL: https://issues.apache.org/jira/browse/KAFKA-6527 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Labels: flakey, flaky-test > Fix For: 3.8.0 > > > {code:java} > java.lang.AssertionError: Log segment size increase not applied > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16413) Add tests for FileLock
[ https://issues.apache.org/jira/browse/KAFKA-16413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16413. Fix Version/s: 3.8.0 Resolution: Fixed > Add tests for FileLock > -- > > Key: KAFKA-16413 > URL: https://issues.apache.org/jira/browse/KAFKA-16413 > Project: Kafka > Issue Type: Test >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > Fix For: 3.8.0 > > > Ref: [https://github.com/apache/kafka/pull/15568#pullrequestreview-1950676267] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16413: add FileLockTest [kafka]
chia7712 merged PR #15624: URL: https://github.com/apache/kafka/pull/15624 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16413: add FileLockTest [kafka]
chia7712 commented on PR #15624: URL: https://github.com/apache/kafka/pull/15624#issuecomment-2034576975 ``` ./gradlew cleanTest :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful --tests JmxToolTest.initializationError :connect:runtime:test --tests org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests IdentityReplicationIntegrationTest.testReplicateFromLatest --tests MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs --tests MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault --tests MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests ZkMigrationIntegrationTest.testMigrateTopicDeletions --tests UserQuotaTest.testQuotaOverrideDelete --tests SocketServerTest.testControlPlaneTa kePrecedenceOverInterBrokerListenerAsPrivilegedListener --tests SocketServerTest.testZeroMaxConnectionsPerIp --tests SocketServerTest.testStagedListenerShutdownWhenConnectionQueueIsFull --tests SocketServerTest.testStagedListenerStartup --tests SocketServerTest.testControlPlaneAsPrivilegedListener --tests SocketServerTest.testInterBrokerListenerAsPrivilegedListener --tests LogDirFailureTest.testIOExceptionDuringLogRoll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests SelectorTest.testConnectionsByClientMetric --tests Tls12SelectorTest.testConnectionsByClientMetric --tests SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests Tls12SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests SelectorTest.testMuteOnOOM --tests Tls12SelectorTest.testMuteOnOOM --tests Tls13SelectorTest.testConnectionsByClientMetric --tests Tls13SelectorTest.testInboundConnectionsCountInConnectionCreationMetric --tests Tls13SelectorTest.tes tMuteOnOOM ``` they pass on my local. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1549709937 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: If we didn't know if the future log was caught up or not, then I'd prefer (a), but at this point I can't conceive of a different scenario – other than a failure during replica promotion – that would cause the future log to be in the directory assigned in the metadata. So I agree that the two logs likely will be either caught up or very close. So I agree it makes more sense to do (b) - promote the future log and delete the main one. We can still run into trouble if the directory with the main replica is offline. At some point that will cause a crash if the directory ever comes back online. But there's nothing we can do about that here. Maybe future work could improve how the broker handles loading conflicting logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1549695928 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: Thanks for the feedback. For (2), we've couple of options. We can either: (a) ignore the future replica (say in dir2) if the main replica exists in an online log dir (say dir1) or, (b) promote the future replica (in dir2) and remove the main replica (in dir1). (a) would result in ReplicaManager spawning a replicaAlterLogDir thread for the future replica and correcting the assignment to dir1, only for it to be changed back again to dir2 when the replicaAlterLogDir thread finishes its job. Refer https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2734 and https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2745 Since in these scenarios, the future replica is almost caught up with the main replica, I'm leaning towards option (b) to avoid more reassignments. Please let me know if you feel otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
chia7712 commented on PR #15639: URL: https://github.com/apache/kafka/pull/15639#issuecomment-2034397405 ``` ./gradlew cleanTest :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful :connect:runtime:test --tests org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :server:test --tests AssignmentsManagerTest.testRequeuesFailedAssignmentPropagations :connect:mirror:test --tests MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow --tests MirrorConnectorsIntegrationSSLTest.testReplicateFromLatest --tests IdentityReplicationIntegrationTest.testReplicateSourceDefault --tests MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault :core:test --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests LogDirFailureTest.testIOExceptionDuringLog Roll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint ``` all pass on my local. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup Part 2 [kafka]
chia7712 merged PR #15639: URL: https://github.com/apache/kafka/pull/15639 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1549552216 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -289,13 +289,17 @@ class BrokerMetadataPublisher( try { // Start log manager, which will perform (potentially lengthy) // recovery-from-unclean-shutdown if required. - logManager.startup(metadataCache.getAllTopics()) - - // Delete partition directories which we're not supposed to have. We have - // to do this before starting ReplicaManager, so that the stray replicas - // don't block creation of new ones with different IDs but the same names. - // See KAFKA-14616 for details. - logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics()) + logManager.startup( +metadataCache.getAllTopics(), +isStray = (topicId, partition) => { + val tid = topicId.getOrElse { +throw new RuntimeException(s"Partition $partition does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + Option(newImage.topics().getPartition(tid, partition.partition())) +.exists(_.replicas.contains(brokerId)) Review Comment: You are right, the new `isStrayKraftReplica` is finding both cases but I guess I took Igor suggestion without giving it a second thought https://github.com/apache/kafka/pull/15335#discussion_r1512748010 will fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16457 Useless import class [kafka]
chia7712 merged PR #15646: URL: https://github.com/apache/kafka/pull/15646 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16467) Add README to docs folder
[ https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833498#comment-17833498 ] Chia-Ping Tsai commented on KAFKA-16467: Have you checked the option1? not sure whether it is valid now. option2 seems be more simple than option1 to me > Add README to docs folder > - > > Key: KAFKA-16467 > URL: https://issues.apache.org/jira/browse/KAFKA-16467 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > We don't have a guide in project root folder or docs folder to show how to > run local website. It's good to provide a way to run document with kafka-site > repository. > > Option 1: Add links to wiki page > [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes] > and > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. > Option 2: Show how to run the document within container. For example: moving > `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16430) The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end.
[ https://issues.apache.org/jira/browse/KAFKA-16430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833497#comment-17833497 ] Chia-Ping Tsai commented on KAFKA-16430: {quote} what you mean? Is the newer kafka script referring to the use of the new version of the kafka-consumer-group.sh client script? But now there is a problem with the kafka broker server side. {quote} Please ignore my previous comment :( {quote} At the same time, I found through the top command that the "group-metadata-manager-0" thread was constantly consuming 100% of the CPU resources. This loop could not be broken, resulting in the inability to consume topic partition data on that node. At this point, I suspected that the issue may be related to the __consumer_offsets partition data file loaded by this thread. {quote} Could you share more details? for example, the thread dump or hot path you observed {quote} We encountered this issue in our production environment using Kafka versions 2.2.1 and 2.4.0, and I believe it may also affect other versions. {quote} As kafka 2.x is EOL, is it possible that your team use kafak 3.x to reproduce the issue? > The group-metadata-manager thread is always in a loading state and occupies > one CPU, unable to end. > --- > > Key: KAFKA-16430 > URL: https://issues.apache.org/jira/browse/KAFKA-16430 > Project: Kafka > Issue Type: Bug > Components: group-coordinator >Affects Versions: 2.4.0 >Reporter: Gao Fei >Priority: Blocker > > I deployed three broker instances and suddenly found that the client was > unable to consume data from certain topic partitions. I first tried to log in > to the broker corresponding to the group and used the following command to > view the consumer group: > {code:java} > ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe > --group mygroup{code} > and found the following error: > {code:java} > Error: Executing consumer group command failed due to > org.apache.kafka.common.errors.CoodinatorLoadInProgressException: The > coodinator is loading and hence can't process requests.{code} > I then discovered that the broker may be stuck in a loop, which is constantly > in a loading state. At the same time, I found through the top command that > the "group-metadata-manager-0" thread was constantly consuming 100% of the > CPU resources. This loop could not be broken, resulting in the inability to > consume topic partition data on that node. At this point, I suspected that > the issue may be related to the __consumer_offsets partition data file loaded > by this thread. > Finally, after restarting the broker instance, everything was back to normal. > It's very strange that if there was an issue with the __consumer_offsets > partition data file, the broker should have failed to start. Why was it able > to automatically recover after a restart? And why did this continuous loop > loading of the __consumer_offsets partition data occur? > We encountered this issue in our production environment using Kafka versions > 2.2.1 and 2.4.0, and I believe it may also affect other versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2034230452 the failed tests pass on my local. ``` ./gradlew cleanTest :streams:test --tests SlidingWindowedKStreamIntegrationTest.shouldRestoreAfterJoinRestart :storage:test --tests TransactionsWithTieredStoreTest.testSendOffsetsToTransactionTimeout :metadata:test --tests QuorumControllerTest.testFenceMultipleBrokers :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests DedicatedMirrorIntegrationTest.testMultiNodeCluster --tests MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault --tests MirrorConnectorsIntegrationBaseTest.testOneWayReplicationWithFrequentOffsetSyncs --tests MirrorConnectorsIntegrationExactlyOnceTest.testSyncTopicConfigs :core:test --tests ListOffsetsIntegrationTest.testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInOneBatch --tests ListOffsetsIntegrationTest.testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer --tests ListOffsetsIntegrationTest.test ThreeCompressedRecordsInOneBatch --tests ListOffsetsIntegrationTest.testThreeCompressedRecordsInSeparateBatch --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInSeparateBatch --tests DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithoutDescribeAclViaAssign --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests ConsumerBounceTest.testCloseDuringRebalance --tests LogDirFailureTest.testIOExceptionDuringLogRoll --tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Kafka capabilities
Hi Kafka users Does any one have a document or ppt that showcases the capabilities of Kafka along with any cost management capability? i have a customer who is still using IBM MQM and rabbit MQ. I want the client to consider kafka for messaging and data streaming. I wanted to seek your expert help if you have any document or ppt i can propose it as an example. could you pls help. thanks and regards KrisG
Re: [PR] KAFKA-15062: Adding ppc64le build stage [kafka]
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-2034128217 @divijvaidya can you also have a look at this once https://lists.apache.org/thread/f3yj7o5nfskz1onr59kmodm73kvtsktk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12399: Deprecate Log4J Appender [kafka]
mimaison commented on PR #10244: URL: https://github.com/apache/kafka/pull/10244#issuecomment-2034028295 Yes ideally we need the deprecation to go in 3.8 so we can delete the appender in 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
soarez commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549319336 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: Should this really be non-fatal? What's the thinking behind this decision? If there is an IO failure on the metadata log dir the broker should not continue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]
showuon commented on PR #15409: URL: https://github.com/apache/kafka/pull/15409#issuecomment-2034010084 @gaurav-narula , the new added KRaft tests are failing in our test env. The logs are [here](https://drive.google.com/file/d/1CbxgH8eswEXX0YDEJpZn7B9Vd4x-gfo4/view?usp=sharing). Could you help check and fix them? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]
showuon commented on code in PR #15532: URL: https://github.com/apache/kafka/pull/15532#discussion_r1549312174 ## build.gradle: ## @@ -1435,10 +1454,10 @@ project(':clients') { implementation libs.opentelemetryProto // libraries which should be added as runtime dependencies in generated pom.xml should be defined here: -shadow libs.zstd -shadow libs.lz4 -shadow libs.snappy -shadow libs.slf4jApi +shadowed libs.zstd +shadowed libs.lz4 +shadowed libs.snappy +shadowed libs.slf4jApi Review Comment: @apoorvmittal10 , is there any update about the suggestion from @mimaison ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org