Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1564488318 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") Review Comment: Thanks! I've pushed a commit to address all your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky test testRackAwareRangeAssignor [kafka]
github-actions[bot] commented on PR #14829: URL: https://github.com/apache/kafka/pull/14829#issuecomment-2053887957 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]
chia7712 commented on code in PR #15715: URL: https://github.com/apache/kafka/pull/15715#discussion_r1564305286 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -99,6 +99,10 @@ public List getAdditionalExtensions() { clusterConfig.brokerServerProperties(brokerId).forEach( (key, value) -> brokerNode.propertyOverrides().put(key.toString(), value.toString())); }); +nodes.controllerNodes().forEach((controllerId, controllerNode) -> { Review Comment: `{` is redundant. ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -203,6 +205,16 @@ private ClusterTestDefaults getClusterTestDefaults(Class testClass) { .orElseGet(() -> EmptyClass.class.getDeclaredAnnotation(ClusterTestDefaults.class)); } +private static void applyConfig(ClusterConfig config, ClusterConfigProperty property) { +if (property.id() == -1) { +config.serverProperties().put(property.key(), property.value()); +} else if (property.id() >= CONTROLLER_ID_OFFSET) { Review Comment: We need to document it to make sure developers are aware of broker/controller id when they try to define per broker/controller configs ## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ## @@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu ClusterConfig config = builder.build(); Review Comment: Could you please refactor the code to make `ClusterConfig` immutable? It seems be a chaos that we pass mutable objects everywhere ... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation
[ https://issues.apache.org/jira/browse/KAFKA-16547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16547: -- Assignee: Chia-Ping Tsai > add test for DescribeConfigsOptions#includeDocumentation > > > Key: KAFKA-16547 > URL: https://issues.apache.org/jira/browse/KAFKA-16547 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > as title, we have no tests for the query option. > If the option is configured to false, `ConfigEntry#documentation` should be > null. otherwise, it should return the config documention. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation
Chia-Ping Tsai created KAFKA-16547: -- Summary: add test for DescribeConfigsOptions#includeDocumentation Key: KAFKA-16547 URL: https://issues.apache.org/jira/browse/KAFKA-16547 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai as title, we have no tests for the query option. If the option is configured to false, `ConfigEntry#documentation` should be null. otherwise, it should return the config documention. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16546) add docs to explain how to update cluster-wide default by Admin#incrementalAlterConfigs
[ https://issues.apache.org/jira/browse/KAFKA-16546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16546: --- Priority: Minor (was: Major) > add docs to explain how to update cluster-wide default by > Admin#incrementalAlterConfigs > --- > > Key: KAFKA-16546 > URL: https://issues.apache.org/jira/browse/KAFKA-16546 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > We have good docs about updating cluster-wide configs by commend tool > (https://kafka.apache.org/documentation/#dynamicbrokerconfigs), and it would > be great Admin#incrementalAlterConfigs has such good docs also. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16546) add docs to explain how to update cluster-wide default by Admin#incrementalAlterConfigs
Chia-Ping Tsai created KAFKA-16546: -- Summary: add docs to explain how to update cluster-wide default by Admin#incrementalAlterConfigs Key: KAFKA-16546 URL: https://issues.apache.org/jira/browse/KAFKA-16546 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai We have good docs about updating cluster-wide configs by commend tool (https://kafka.apache.org/documentation/#dynamicbrokerconfigs), and it would be great Admin#incrementalAlterConfigs has such good docs also. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
brandboat commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1564160360 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = assertDoesNotThrow(() -> Files.createTempDirectory("tmp")).toFile(); Review Comment: looks like we have to add this `` to https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/checkstyle/import-control-storage.xml#L73-L80 otherwise checkstyle raise error `disallow import` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1564147419 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") Review Comment: > ClusterTestDefaults didn't have the API serverProperties so I think it wouldn't compile It is supported now. see #15687 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1564142762 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") Review Comment: those `serverProperties` are annotated with `@BeforeEach`. Do you mean you want to put them like below ? ``` @ClusterTest(clusterType = Type.ALL, serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4") }) ``` If so we'd have to modify each test. `ClusterTestDefaults` didn't have the API `serverProperties` so I think it wouldn't compile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1564142762 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") Review Comment: those server config are annotated with `@BeforeEach`. Do you mean you want to put them like below ? ``` @ClusterTest(clusterType = Type.ALL, serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4") }) ``` If so we'd have to modify each test. `ClusterTestDefaults` didn't have the API `serverProperties` so I think it wouldn't compile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]
brandboat opened a new pull request, #15715: URL: https://github.com/apache/kafka/pull/15715 related to KAFKA-16484 Introduce a new field `id` in annotation `ClusterConfigProperty`. The main purpose of new field is to define specific broker/controller(kraft) property. And the default value is `-1` which means the ClusterConfigProperty will apply to all broker/controller. Note that under [Type.KRAFT](https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/core/src/test/java/kafka/test/annotation/Type.java#L31) mode, the controller id [starts from 3000](https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/core/src/test/java/kafka/testkit/TestKitNodes.java#L141-L146), and then increments by one each time. Other modes the broker/controller id starts from 0 and then increments by one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15480) Add RemoteStorageInterruptedException
[ https://issues.apache.org/jira/browse/KAFKA-15480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15480: - Labels: kip (was: ) > Add RemoteStorageInterruptedException > - > > Key: KAFKA-15480 > URL: https://issues.apache.org/jira/browse/KAFKA-15480 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 3.6.0 >Reporter: Mital Awachat >Priority: Major > Labels: kip > Fix For: 3.8.0 > > > Introduce `RemoteStorageInterruptedException` to propagate interruptions from > the plugin to Kafka without generated (false) errors. > It allows the plugin to notify Kafka an API operation in progress was > interrupted as a result of task cancellation, which can happen under changes > such as leadership migration or topic deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request
[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15776: - Labels: kip (was: ) > Update delay timeout for DelayedRemoteFetch request > --- > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one {{fetch.remote.max.wait.ms}} config (preferably > server config) to define the delay timeout for DelayedRemoteFetch requests > (or) take it from client similar to {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request
[ https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-15776: - Labels: kip-1018 (was: kip) > Update delay timeout for DelayedRemoteFetch request > --- > > Key: KAFKA-15776 > URL: https://issues.apache.org/jira/browse/KAFKA-15776 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > Labels: kip-1018 > > We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for > DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the > given amount of time when there is no data available to serve the FETCH > request. > {code:java} > The maximum amount of time the server will block before answering the fetch > request if there isn't sufficient data to immediately satisfy the requirement > given by fetch.min.bytes. > {code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41] > Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the > user on how to configure optimal value for each purpose. Moreover, the config > is of *LOW* importance and most of the users won't configure it and use the > default value of 500 ms. > Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to > higher number of expired delayed remote fetch requests when the remote > storage have any degradation. > We should introduce one {{fetch.remote.max.wait.ms}} config (preferably > server config) to define the delay timeout for DelayedRemoteFetch requests > (or) take it from client similar to {{request.timeout.ms}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15341: Assignee: Kamal Chandraprakash > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Kamal Chandraprakash >Priority: Major > Labels: KIP-405 > Fix For: 3.8.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems
[ https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15341: Assignee: (was: Kamal Chandraprakash) > Enabling TS for a topic during rolling restart causes problems > -- > > Key: KAFKA-15341 > URL: https://issues.apache.org/jira/browse/KAFKA-15341 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Major > Labels: KIP-405 > Fix For: 3.8.0 > > > When we are in a rolling restart to enable TS at system level, some brokers > have TS enabled on them and some don't. We send an alter config call to > enable TS for a topic, it hits a broker which has TS enabled, this broker > forwards it to the controller and controller will send the config update to > all brokers. When another broker which doesn't have TS enabled (because it > hasn't undergone the restart yet) gets this config change, it "should" fail > to apply it. But failing now is too late since alterConfig has already > succeeded since controller->broker config propagation is done async. > With this JIRA, we want to have controller check if TS is enabled on all > brokers before applying alter config to turn on TS for a topic. > Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments
[ https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15682: Assignee: Kamal Chandraprakash > Ensure internal remote log metadata topic does not expire its segments before > deleting user-topic segments > -- > > Key: KAFKA-15682 > URL: https://issues.apache.org/jira/browse/KAFKA-15682 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > One of the implementation of RemoteLogMetadataManager is > TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic > {{__remote_log_metadata}} to store the metadata about the remote log > segments. Unlike other internal topics which are compaction enabled, this > topic is not enabled with compaction and retention is set to unlimited. > Keeping this internal topic retention to unlimited is not practical in real > world use-case where the topic local disk usage footprint grow huge over a > period of time. > It is assumed that the user will set the retention to a reasonable time such > that it is the max of all the user-created topics (max + X). We can't just > rely on the assumption and need an assertion to ensure that the internal > {{__remote_log_metadata}} segments are not eligible for deletion before the > expiry of all the relevant user-topic uploaded remote-log-segments , > otherwise there will be dangling remote-log-segments which won't be cleared > once all the brokers are restarted post the internal topic retention cleanup. > See the discussion thread: > https://github.com/apache/kafka/pull/14576#discussion_r1368576126 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments
[ https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-15682: Assignee: (was: Kamal Chandraprakash) > Ensure internal remote log metadata topic does not expire its segments before > deleting user-topic segments > -- > > Key: KAFKA-15682 > URL: https://issues.apache.org/jira/browse/KAFKA-15682 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Priority: Major > > One of the implementation of RemoteLogMetadataManager is > TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic > {{__remote_log_metadata}} to store the metadata about the remote log > segments. Unlike other internal topics which are compaction enabled, this > topic is not enabled with compaction and retention is set to unlimited. > Keeping this internal topic retention to unlimited is not practical in real > world use-case where the topic local disk usage footprint grow huge over a > period of time. > It is assumed that the user will set the retention to a reasonable time such > that it is the max of all the user-created topics (max + X). We can't just > rely on the assumption and need an assertion to ensure that the internal > {{__remote_log_metadata}} segments are not eligible for deletion before the > expiry of all the relevant user-topic uploaded remote-log-segments , > otherwise there will be dangling remote-log-segments which won't be cleared > once all the brokers are restarted post the internal topic retention cleanup. > See the discussion thread: > https://github.com/apache/kafka/pull/14576#discussion_r1368576126 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-9578) Kafka Tiered Storage - System Tests
[ https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-9578: --- Assignee: (was: Kamal Chandraprakash) > Kafka Tiered Storage - System Tests > > > Key: KAFKA-9578 > URL: https://issues.apache.org/jira/browse/KAFKA-9578 > Project: Kafka > Issue Type: Test >Reporter: Harsha >Priority: Major > Fix For: 3.8.0 > > > Initial test cases set up by [~Ying Zheng] > > [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.
[ https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-13560: Assignee: (was: Kamal Chandraprakash) > Load indexes and data in async manner in the critical path of replica fetcher > threads. > --- > > Key: KAFKA-13560 > URL: https://issues.apache.org/jira/browse/KAFKA-13560 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Priority: Major > Fix For: 3.8.0 > > > https://github.com/apache/kafka/pull/11390#discussion_r762366976 > https://github.com/apache/kafka/pull/11390#discussion_r1033141283 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16511) Leaking tiered segments
[ https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-16511: Assignee: Kamal Chandraprakash > Leaking tiered segments > --- > > Key: KAFKA-16511 > URL: https://issues.apache.org/jira/browse/KAFKA-16511 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Francois Visconte >Assignee: Kamal Chandraprakash >Priority: Major > Labels: tiered-storage > > I have some topics there were not written since a few days (having 12h > retention) where some data remains on tiered storage (in our case S3) and > they are never deleted. > > Looking at the log history, it appears that we never even tried to delete > these segments: > When looking at one of the non-leaking segment, I get the following > interesting messages: > > {code:java} > "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current > earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), > segment-end-offset: 2976819 and segment-epochs: [5]" > "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data > for completed successfully > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data > for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}" > "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied > 02968418.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ}" > "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, > metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, > id=fqGng3UURCG3-v4lETeLKQ} > , startOffset=2968418, endOffset=2976819, brokerId=10029, > maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, > segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" > {code} > > Which looks right because we can see logs from both the plugin and remote log > manager indicating that the remote log segment was removed. > Now if I look on one of the leaked segment, here is what I see > > {code:java} > "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 > partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied > 02971163.log to remote storage with segment-id: > RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ}" > "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data > completed successfully, metadata: > RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId > {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, > id=8dP13VDYSaiFlubl9SNBTQ} > , startOffset=2971163, endOffset=2978396, brokerId=10001, > maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, > segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, > customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}" >
[jira] [Assigned] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.
[ https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-14915: Assignee: (was: Kamal Chandraprakash) > Option to consume multiple partitions that have their data in remote storage > for the target offsets. > > > Key: KAFKA-14915 > URL: https://issues.apache.org/jira/browse/KAFKA-14915 > Project: Kafka > Issue Type: Improvement >Reporter: Satish Duggana >Priority: Major > Labels: tiered-storage > Fix For: 3.8.0 > > > Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on PR #15631: URL: https://github.com/apache/kafka/pull/15631#issuecomment-2053698624 Re-triggered the tests. Tests failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]
CalvinConfluent commented on code in PR #15702: URL: https://github.com/apache/kafka/pull/15702#discussion_r1564110479 ## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ## @@ -423,6 +441,17 @@ public void replay(ConfigRecord record) { log.info("Replayed ConfigRecord for {} which set configuration {} to {}", configResource, record.name(), record.value()); } +if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) { Review Comment: Sorry. This is bad, really bad :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on code in PR #15489: URL: https://github.com/apache/kafka/pull/15489#discussion_r1564099443 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") public class GetOffsetShellTest { private final int topicCount = 4; Review Comment: `topicName` could be a local variable ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -81,28 +86,57 @@ private void setUp() { } Properties props = new Properties(); -props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers")); +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (KafkaProducer producer = new KafkaProducer<>(props)) { IntStream.range(0, topicCount + 1) .forEach(i -> IntStream.range(0, i * i) -.forEach(msgCount -> producer.send( -new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount))) +.forEach(msgCount -> { +try { Review Comment: we can leverage `assertDoesNotThrow` to simplify the code ```java assertDoesNotThrow(() -> producer.send( new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)).get()); ``` ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -48,7 +53,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL) @Tag("integration") Review Comment: Could you use `serverProperties` to define default server configs? ```java cluster.config().serverProperties().put("auto.create.topics.enable", false); cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); ``` could be replaced by ```java @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4") }) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
chia7712 commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1564094159 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; +import org.apache.kafka.common.utils.Utils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Files; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PartitionMetadataFileTest { +private final File dir = assertDoesNotThrow(() -> Files.createTempDirectory("tmp")).toFile(); Review Comment: we can replace it by `TestUtils.tempDirectory()`, and it can handle the cleanup of temporary folder we created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW opened a new pull request, #15714: URL: https://github.com/apache/kafka/pull/15714 In PartitionMetadataFile.java, there is a defensive early-fail design when setting a different topicId, but there are no corresponding test cases. Hence, this PR adds two tests for setting the same and different topicId. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16545: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Auto adjust the replica factor according to number of broker when using > ClusterTestExtensions > - > > Key: KAFKA-16545 > URL: https://issues.apache.org/jira/browse/KAFKA-16545 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Major > > In most test cases, we start single broker so as to save resources. However, > it could causes error when creating internal topics since they require 3 > replicas by default. In order to reducing the duplicate configs from all > tests, we can add a bit sugar to auto adjust the replica factor (if it is not > defined by tests) when the number of brokers started by tests is less then > default value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions
[ https://issues.apache.org/jira/browse/KAFKA-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836822#comment-17836822 ] PoAn Yang commented on KAFKA-16545: --- Hi [~chia7712], I'm interested in this issue. May I assign to myself? Thank you. > Auto adjust the replica factor according to number of broker when using > ClusterTestExtensions > - > > Key: KAFKA-16545 > URL: https://issues.apache.org/jira/browse/KAFKA-16545 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > In most test cases, we start single broker so as to save resources. However, > it could causes error when creating internal topics since they require 3 > replicas by default. In order to reducing the duplicate configs from all > tests, we can add a bit sugar to auto adjust the replica factor (if it is not > defined by tests) when the number of brokers started by tests is less then > default value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Fix incorrect Java equals comparison of Uuid by reference [kafka]
chia7712 merged PR #15707: URL: https://github.com/apache/kafka/pull/15707 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
[ https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16544: -- Assignee: Kuan Po Tseng (was: Chia-Ping Tsai) > DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames > should return null instead of throwing NPE > -- > > Key: KAFKA-16544 > URL: https://issues.apache.org/jira/browse/KAFKA-16544 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Major > > {code:java} > * @return A future map from topic names to descriptions which can be > used to check > * the status of individual description if the describe topic > request used > * topic names, otherwise return null, this request succeeds only > if all the > * topic descriptions succeed > {code} > According the docs, it should return null if we try to get the result > unmatched to the request. For example, we call `allTopicNames` in passing > `TopicIdCollection`. However, the current implementation will throw NPE > directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
Owen-CH-Leung commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2053607875 > @Owen-CH-Leung > > The root cause is that not all produce records succeed to be sent, and we don't check all sends before closing producer. As our CI is very busy, it could case timeout if we send many records at once. Hence, could you call `get` on the `Producer#send` to make sure those records get sent. Thanks a lot for helping to identify the root cause! I've just pushed a commit to address all your comments. Let's see what we got from the build =) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1563938474 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## Review Comment: No more un safe type casting to be found in this class. Not that much has changed in this abstract class. I have moved some code in methods. Side Note: I don't have the correct formatter configured :/ I couldn't find any contribution notes to set the correct one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
raminqaf commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1563929782 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java: ## @@ -63,21 +63,6 @@ public static LeftOrRightValue makeRightValue(final V2 rightVal return new LeftOrRightValue<>(null, rightValue); } -/** - * Create a new {@link LeftOrRightValue} instance with the V value as {@code leftValue} if - * {@code isLeftSide} is True; otherwise {@code rightValue} if {@code isLeftSide} is False. - * - * @param value the V value (either V1 or V2 type) - * @paramthe type of the value - * @return a new {@link LeftOrRightValue} instance - */ -public static LeftOrRightValue make(final boolean isLeftSide, final V value) { -Objects.requireNonNull(value, "value is null"); -return isLeftSide -? LeftOrRightValue.makeLeftValue(value) -: LeftOrRightValue.makeRightValue(value); -} - Review Comment: Removed as promised ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## Review Comment: Not that much has changed in this abstract class. I have moved some code in methods. ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java: ## @@ -52,7 +52,26 @@ private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, final lon public static TimestampedKeyAndJoinSide make(final boolean leftSide, final K key, final long timestamp) { return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp); } - +/** + * Create a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null}. + * + * @param key the key + * @param the type of the key + * @return a new {@link TimestampedKeyAndJoinSide} instance for the left join side if the provide {@code key} is not {@code null} + */ +public static TimestampedKeyAndJoinSide makeLeft(final K key, final long timestamp) { +return new TimestampedKeyAndJoinSide<>(true, key, timestamp); +} +/** + * Create a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null}. + * + * @param key the key + * @param the type of the key + * @return a new {@link TimestampedKeyAndJoinSide} instance for the right join side if the provide {@code key} is not {@code null} + */ +public static TimestampedKeyAndJoinSide makeRight(final K key, final long timestamp) { +return new TimestampedKeyAndJoinSide<>(false, key, timestamp); +} Review Comment: @gharris1727 I have added these static factory methods for a cleaner definition. If you prefer, I can have them removed and use the `make` method. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## Review Comment: A clean abstraction. Each join side defines its "this side value" and "other side value". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE
[ https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836797#comment-17836797 ] Kuan Po Tseng commented on KAFKA-16544: --- gentle ping [~chia7712], are you working on this one ? Maybe I can give it a try ! :) > DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames > should return null instead of throwing NPE > -- > > Key: KAFKA-16544 > URL: https://issues.apache.org/jira/browse/KAFKA-16544 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {code:java} > * @return A future map from topic names to descriptions which can be > used to check > * the status of individual description if the describe topic > request used > * topic names, otherwise return null, this request succeeds only > if all the > * topic descriptions succeed > {code} > According the docs, it should return null if we try to get the result > unmatched to the request. For example, we call `allTopicNames` in passing > `TopicIdCollection`. However, the current implementation will throw NPE > directly -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. During this time, if a follower sends the FETCH request to read from the current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned: KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLog Partition.fetchRecords Partition.readRecords UnifiedLog.read UnifiedLog.fetchHighWatermarkMetadata UnifiedLog.convertToOffsetMetadataOrThrow LocalLog.convertToOffsetMetadataOrThrow LocalLog.read ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't process the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that in LeaderAndIsr request even if one partition fails, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker 1002 becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION error-code to the producer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata -val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) +val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't processed the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that if the LeaderAndIsr request fails to process for one partition, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION errorcode to the producer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org