Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


Owen-CH-Leung commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1564488318


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")

Review Comment:
   Thanks! I've pushed a commit to address all your comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky test testRackAwareRangeAssignor [kafka]

2024-04-13 Thread via GitHub


github-actions[bot] commented on PR #14829:
URL: https://github.com/apache/kafka/pull/14829#issuecomment-2053887957

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-13 Thread via GitHub


chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1564305286


##
core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java:
##
@@ -99,6 +99,10 @@ public List getAdditionalExtensions() {
 clusterConfig.brokerServerProperties(brokerId).forEach(
 (key, value) -> 
brokerNode.propertyOverrides().put(key.toString(), value.toString()));
 });
+nodes.controllerNodes().forEach((controllerId, controllerNode) 
-> {

Review Comment:
   `{` is redundant.



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -203,6 +205,16 @@ private ClusterTestDefaults 
getClusterTestDefaults(Class testClass) {
 .orElseGet(() -> 
EmptyClass.class.getDeclaredAnnotation(ClusterTestDefaults.class));
 }
 
+private static void applyConfig(ClusterConfig config, 
ClusterConfigProperty property) {
+if (property.id() == -1) {
+config.serverProperties().put(property.key(), property.value());
+} else if (property.id() >= CONTROLLER_ID_OFFSET) {

Review Comment:
   We need to document it to make sure developers are aware of 
broker/controller id when they try to define per broker/controller configs



##
core/src/test/java/kafka/test/junit/ClusterTestExtensions.java:
##
@@ -190,10 +192,10 @@ private void processClusterTest(ExtensionContext context, 
ClusterTest annot, Clu
 
 ClusterConfig config = builder.build();

Review Comment:
   Could you please refactor the code to make `ClusterConfig` immutable? It 
seems be a chaos that we pass mutable objects everywhere ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation

2024-04-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16547:
--

Assignee: Chia-Ping Tsai

> add test for DescribeConfigsOptions#includeDocumentation
> 
>
> Key: KAFKA-16547
> URL: https://issues.apache.org/jira/browse/KAFKA-16547
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> as title, we have no tests for the query option.
> If the option is configured to false, `ConfigEntry#documentation` should be 
> null. otherwise, it should return the config documention.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16547) add test for DescribeConfigsOptions#includeDocumentation

2024-04-13 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16547:
--

 Summary: add test for DescribeConfigsOptions#includeDocumentation
 Key: KAFKA-16547
 URL: https://issues.apache.org/jira/browse/KAFKA-16547
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


as title, we have no tests for the query option.

If the option is configured to false, `ConfigEntry#documentation` should be 
null. otherwise, it should return the config documention.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16546) add docs to explain how to update cluster-wide default by Admin#incrementalAlterConfigs

2024-04-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16546:
---
Priority: Minor  (was: Major)

> add docs to explain how to update cluster-wide default by 
> Admin#incrementalAlterConfigs
> ---
>
> Key: KAFKA-16546
> URL: https://issues.apache.org/jira/browse/KAFKA-16546
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> We have good docs about updating cluster-wide configs by commend tool 
> (https://kafka.apache.org/documentation/#dynamicbrokerconfigs), and it would 
> be great Admin#incrementalAlterConfigs has such good docs also.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16546) add docs to explain how to update cluster-wide default by Admin#incrementalAlterConfigs

2024-04-13 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16546:
--

 Summary: add docs to explain how to update cluster-wide default by 
Admin#incrementalAlterConfigs
 Key: KAFKA-16546
 URL: https://issues.apache.org/jira/browse/KAFKA-16546
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We have good docs about updating cluster-wide configs by commend tool 
(https://kafka.apache.org/documentation/#dynamicbrokerconfigs), and it would be 
great Admin#incrementalAlterConfigs has such good docs also.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-13 Thread via GitHub


brandboat commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1564160360


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = assertDoesNotThrow(() -> 
Files.createTempDirectory("tmp")).toFile();

Review Comment:
   looks like we have to add this `` to 
   
https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/checkstyle/import-control-storage.xml#L73-L80
   
   otherwise checkstyle raise error `disallow import`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1564147419


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")

Review Comment:
   > ClusterTestDefaults didn't have the API serverProperties so I think it 
wouldn't compile
   
   It is supported now. see #15687



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


Owen-CH-Leung commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1564142762


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")

Review Comment:
   those `serverProperties` are annotated with `@BeforeEach`. Do you mean you 
want to put them like below ?
   
   ```
   @ClusterTest(clusterType = Type.ALL, serverProperties = {
   @ClusterConfigProperty(key = "auto.create.topics.enable", value 
= "false"),
   @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
   @ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "4")
   })
   ```
   
   If so we'd have to modify each test. `ClusterTestDefaults` didn't have the 
API `serverProperties` so I think it wouldn't compile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


Owen-CH-Leung commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1564142762


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")

Review Comment:
   those server config are annotated with `@BeforeEach`. Do you mean you want 
to put them like below ?
   
   ```
   @ClusterTest(clusterType = Type.ALL, serverProperties = {
   @ClusterConfigProperty(key = "auto.create.topics.enable", value 
= "false"),
   @ClusterConfigProperty(key = "offsets.topic.replication.factor", 
value = "1"),
   @ClusterConfigProperty(key = "offsets.topic.num.partitions", 
value = "4")
   })
   ```
   
   If so we'd have to modify each test. `ClusterTestDefaults` didn't have the 
API `serverProperties` so I think it wouldn't compile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16484: Support to define per broker/controller property by ClusterConfigProperty [kafka]

2024-04-13 Thread via GitHub


brandboat opened a new pull request, #15715:
URL: https://github.com/apache/kafka/pull/15715

   related to KAFKA-16484
   
   Introduce a new field `id` in annotation `ClusterConfigProperty`. The main 
purpose of new field is to define specific broker/controller(kraft) property. 
And the default value is `-1` which means the ClusterConfigProperty will apply 
to all broker/controller.
   
   Note that under 
[Type.KRAFT](https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/core/src/test/java/kafka/test/annotation/Type.java#L31)
 mode, the controller id [starts from 
3000](https://github.com/apache/kafka/blob/c034cf2953e691ce4ecd94bf00ac5810167354bc/core/src/test/java/kafka/testkit/TestKitNodes.java#L141-L146),
 and then increments by one each time. Other modes the broker/controller id 
starts from 0 and then increments by one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15480) Add RemoteStorageInterruptedException

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15480:
-
Labels: kip  (was: )

> Add RemoteStorageInterruptedException
> -
>
> Key: KAFKA-15480
> URL: https://issues.apache.org/jira/browse/KAFKA-15480
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Mital Awachat
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> Introduce `RemoteStorageInterruptedException` to propagate interruptions from 
> the plugin to Kafka without generated (false) errors. 
> It allows the plugin to notify Kafka an API operation in progress was 
> interrupted as a result of task cancellation, which can happen under changes 
> such as leadership migration or topic deletion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Labels: kip  (was: )

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15776) Update delay timeout for DelayedRemoteFetch request

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-15776:
-
Labels: kip-1018  (was: kip)

> Update delay timeout for DelayedRemoteFetch request
> ---
>
> Key: KAFKA-15776
> URL: https://issues.apache.org/jira/browse/KAFKA-15776
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: kip-1018
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15341:


Assignee: Kamal Chandraprakash

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15341:


Assignee: (was: Kamal Chandraprakash)

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.8.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15682:


Assignee: Kamal Chandraprakash

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15682) Ensure internal remote log metadata topic does not expire its segments before deleting user-topic segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-15682:


Assignee: (was: Kamal Chandraprakash)

> Ensure internal remote log metadata topic does not expire its segments before 
> deleting user-topic segments
> --
>
> Key: KAFKA-15682
> URL: https://issues.apache.org/jira/browse/KAFKA-15682
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Priority: Major
>
> One of the implementation of RemoteLogMetadataManager is 
> TopicBasedRemoteLogMetadataManager which uses an internal Kafka topic 
> {{__remote_log_metadata}} to store the metadata about the remote log 
> segments. Unlike other internal topics which are compaction enabled, this 
> topic is not enabled with compaction and retention is set to unlimited. 
> Keeping this internal topic retention to unlimited is not practical in real 
> world use-case where the topic local disk usage footprint grow huge over a 
> period of time. 
> It is assumed that the user will set the retention to a reasonable time such 
> that it is the max of all the user-created topics (max + X). We can't just 
> rely on the assumption and need an assertion to ensure that the internal 
> {{__remote_log_metadata}} segments are not eligible for deletion before the 
> expiry of all the relevant user-topic uploaded remote-log-segments , 
> otherwise there will be dangling remote-log-segments which won't be cleared 
> once all the brokers are restarted post the internal topic retention cleanup.
> See the discussion thread: 
> https://github.com/apache/kafka/pull/14576#discussion_r1368576126



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-9578) Kafka Tiered Storage - System Tests

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-9578:
---

Assignee: (was: Kamal Chandraprakash)

> Kafka Tiered Storage - System  Tests
> 
>
> Key: KAFKA-9578
> URL: https://issues.apache.org/jira/browse/KAFKA-9578
> Project: Kafka
>  Issue Type: Test
>Reporter: Harsha
>Priority: Major
> Fix For: 3.8.0
>
>
> Initial test cases set up by [~Ying Zheng] 
>  
> [https://docs.google.com/spreadsheets/d/1gS0s1FOmcjpKYXBddejXAoJAjEZ7AdEzMU9wZc-JgY8/edit#gid=0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-13560:


Assignee: (was: Kamal Chandraprakash)

> Load indexes and data in async manner in the critical path of replica fetcher 
> threads. 
> ---
>
> Key: KAFKA-13560
> URL: https://issues.apache.org/jira/browse/KAFKA-13560
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.8.0
>
>
> https://github.com/apache/kafka/pull/11390#discussion_r762366976
> https://github.com/apache/kafka/pull/11390#discussion_r1033141283



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16511) Leaking tiered segments

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-16511:


Assignee: Kamal Chandraprakash

> Leaking tiered segments
> ---
>
> Key: KAFKA-16511
> URL: https://issues.apache.org/jira/browse/KAFKA-16511
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Francois Visconte
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: tiered-storage
>
> I have some topics there were not written since a few days (having 12h 
> retention) where some data remains on tiered storage (in our case S3) and 
> they are never deleted.
>  
> Looking at the log history, it appears that we never even tried to delete 
> these segments:
> When looking at one of the non-leaking segment, I get the following 
> interesting messages:
>  
> {code:java}
> "2024-04-02T10:30:45.265Z","""kafka""","""10039""","[RemoteLogManager=10039 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Deleted remote log segment 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ} due to leader-epoch-cache truncation. Current 
> earliest-epoch-entry: EpochEntry(epoch=8, startOffset=2980106), 
> segment-end-offset: 2976819 and segment-epochs: [5]"
> "2024-04-02T10:30:45.242Z","""kafka""","""10039""","Deleting log segment data 
> for completed successfully 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-02T10:30:45.144Z","""kafka""","""10039""","Deleting log segment data 
> for RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013411147, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_FINISHED}"
> "2024-04-01T23:16:51.157Z","""kafka""","""10029""","[RemoteLogManager=10029 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764] Copied 
> 02968418.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}"
> "2024-04-01T23:16:51.147Z","""kafka""","""10029""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> "2024-04-01T23:16:37.328Z","""kafka""","""10029""","Copying log segment data, 
> metadata: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-764, 
> id=fqGng3UURCG3-v4lETeLKQ}
> , startOffset=2968418, endOffset=2976819, brokerId=10029, 
> maxTimestampMs=1712009754536, eventTimestampMs=1712013397319, 
> segmentLeaderEpochs={5=2968418}, segmentSizeInBytes=536351075, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> {code}
>  
> Which looks right because we can see logs from both the plugin and remote log 
> manager indicating that the remote log segment was removed.
> Now if I look on one of the leaked segment, here is what I see
>  
> {code:java}
> "2024-04-02T00:43:33.834Z","""kafka""","""10001""","[RemoteLogManager=10001 
> partition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765] Copied 
> 02971163.log to remote storage with segment-id: 
> RemoteLogSegmentId{topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}"
> "2024-04-02T00:43:33.822Z","""kafka""","""10001""","Copying log segment data 
> completed successfully, metadata: 
> RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId
> {topicIdPartition=5G8Ai8kBSwmQ3Ln4QRY5rA:topic1_3543-765, 
> id=8dP13VDYSaiFlubl9SNBTQ}
> , startOffset=2971163, endOffset=2978396, brokerId=10001, 
> maxTimestampMs=1712010648756, eventTimestampMs=1712018599981, 
> segmentLeaderEpochs={7=2971163}, segmentSizeInBytes=459778940, 
> customMetadata=Optional.empty, state=COPY_SEGMENT_STARTED}"
> 

[jira] [Assigned] (KAFKA-14915) Option to consume multiple partitions that have their data in remote storage for the target offsets.

2024-04-13 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash reassigned KAFKA-14915:


Assignee: (was: Kamal Chandraprakash)

> Option to consume multiple partitions that have their data in remote storage 
> for the target offsets.
> 
>
> Key: KAFKA-14915
> URL: https://issues.apache.org/jira/browse/KAFKA-14915
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Priority: Major
>  Labels: tiered-storage
> Fix For: 3.8.0
>
>
> Context: https://github.com/apache/kafka/pull/13535#discussion_r1171250580



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on PR #15631:
URL: https://github.com/apache/kafka/pull/15631#issuecomment-2053698624

   Re-triggered the tests. Tests failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-13 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1564110479


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {

Review Comment:
   Sorry. This is bad, really bad :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


chia7712 commented on code in PR #15489:
URL: https://github.com/apache/kafka/pull/15489#discussion_r1564099443


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")
 public class GetOffsetShellTest {
 private final int topicCount = 4;

Review Comment:
   `topicName` could be a local variable



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -81,28 +86,57 @@ private void setUp() {
 }
 
 Properties props = new Properties();
-props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.config().producerProperties().get("bootstrap.servers"));
+props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
 try (KafkaProducer producer = new 
KafkaProducer<>(props)) {
 IntStream.range(0, topicCount + 1)
 .forEach(i -> IntStream.range(0, i * i)
-.forEach(msgCount -> producer.send(
-new ProducerRecord<>(getTopicName(i), msgCount 
% i, null, "val" + msgCount)))
+.forEach(msgCount -> {
+try {

Review Comment:
   we can leverage `assertDoesNotThrow` to simplify the code
   ```java
   assertDoesNotThrow(() -> producer.send(
   new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
   ```



##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -48,7 +53,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @Tag("integration")

Review Comment:
   Could you use `serverProperties` to define default server configs?
   ```java
   cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
   
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
   
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
   ```
   could be replaced by 
   ```java
   @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
   @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
   @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = 
"1"),
   @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4")
   })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-13 Thread via GitHub


chia7712 commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1564094159


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+import org.apache.kafka.common.utils.Utils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = assertDoesNotThrow(() -> 
Files.createTempDirectory("tmp")).toFile();

Review Comment:
   we can replace it by `TestUtils.tempDirectory()`, and it can handle the 
cleanup of temporary folder we created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-13 Thread via GitHub


KevinZTW opened a new pull request, #15714:
URL: https://github.com/apache/kafka/pull/15714

   In PartitionMetadataFile.java, there is a defensive early-fail design when 
setting a different topicId, but there are no corresponding test cases. Hence, 
this PR adds two tests for setting the same and different topicId.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions

2024-04-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16545:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Auto adjust the replica factor according to number of broker when using 
> ClusterTestExtensions
> -
>
> Key: KAFKA-16545
> URL: https://issues.apache.org/jira/browse/KAFKA-16545
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Major
>
> In most test cases, we start single broker so as to save resources. However, 
> it could causes error when creating internal topics since they require 3 
> replicas by default. In order to reducing the duplicate configs from all 
> tests, we can add a bit sugar to auto adjust the replica factor (if it is not 
> defined by tests) when the number of brokers started by tests is less then 
> default value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16545) Auto adjust the replica factor according to number of broker when using ClusterTestExtensions

2024-04-13 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836822#comment-17836822
 ] 

PoAn Yang commented on KAFKA-16545:
---

Hi [~chia7712], I'm interested in this issue. May I assign to myself? Thank you.

> Auto adjust the replica factor according to number of broker when using 
> ClusterTestExtensions
> -
>
> Key: KAFKA-16545
> URL: https://issues.apache.org/jira/browse/KAFKA-16545
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> In most test cases, we start single broker so as to save resources. However, 
> it could causes error when creating internal topics since they require 3 
> replicas by default. In order to reducing the duplicate configs from all 
> tests, we can add a bit sugar to auto adjust the replica factor (if it is not 
> defined by tests) when the number of brokers started by tests is less then 
> default value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Fix incorrect Java equals comparison of Uuid by reference [kafka]

2024-04-13 Thread via GitHub


chia7712 merged PR #15707:
URL: https://github.com/apache/kafka/pull/15707


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-04-13 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16544:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames 
> should return null instead of throwing NPE
> --
>
> Key: KAFKA-16544
> URL: https://issues.apache.org/jira/browse/KAFKA-16544
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Major
>
> {code:java}
>  * @return A future map from topic names to descriptions which can be 
> used to check
>  * the status of individual description if the describe topic 
> request used
>  * topic names, otherwise return null, this request succeeds only 
> if all the
>  * topic descriptions succeed
> {code}
> According the docs, it should return null if we try to get the result 
> unmatched to the request. For example, we call `allTopicNames` in passing 
> `TopicIdCollection`. However, the current implementation will throw NPE 
> directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-04-13 Thread via GitHub


Owen-CH-Leung commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2053607875

   > @Owen-CH-Leung
   > 
   > The root cause is that not all produce records succeed to be sent, and we 
don't check all sends before closing producer. As our CI is very busy, it could 
case timeout if we send many records at once. Hence, could you call `get` on 
the `Producer#send` to make sure those records get sent.
   
   Thanks a lot for helping to identify the root cause! I've just pushed a 
commit to address all your comments. Let's see what we got from the build =)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-13 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1563938474


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   No more un safe type casting to be found in this class. Not that much has 
changed in this abstract class. I have moved some code in methods. 
   
   Side Note: I don't have the correct formatter configured :/ I couldn't find 
any contribution notes to set the correct one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]

2024-04-13 Thread via GitHub


raminqaf commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1563929782


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##
@@ -63,21 +63,6 @@ public static  LeftOrRightValue 
makeRightValue(final V2 rightVal
 return new LeftOrRightValue<>(null, rightValue);
 }
 
-/**
- * Create a new {@link LeftOrRightValue} instance with the V value as 
{@code leftValue} if
- * {@code isLeftSide} is True; otherwise {@code rightValue} if {@code 
isLeftSide} is False.
- *
- * @param value the V value (either V1 or V2 type)
- * @paramthe type of the value
- * @return a new {@link LeftOrRightValue} instance
- */
-public static  LeftOrRightValue make(final boolean isLeftSide, final V 
value) {
-Objects.requireNonNull(value, "value is null");
-return isLeftSide
-? LeftOrRightValue.makeLeftValue(value)
-: LeftOrRightValue.makeRightValue(value);
-}
-

Review Comment:
   Removed as promised  



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##


Review Comment:
   Not that much has changed in this abstract class. I have moved some code in 
methods. 



##
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java:
##
@@ -52,7 +52,26 @@ private TimestampedKeyAndJoinSide(final boolean leftSide, 
final K key, final lon
 public static  TimestampedKeyAndJoinSide make(final boolean 
leftSide, final K key, final long timestamp) {
 return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp);
 }
-
+/**
+ * Create a new {@link TimestampedKeyAndJoinSide} instance for the left 
join side if the provide {@code key} is not {@code null}.
+ *
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link TimestampedKeyAndJoinSide} instance for the left 
join side if the provide {@code key} is not {@code null}
+ */
+public static  TimestampedKeyAndJoinSide makeLeft(final K key, final 
long timestamp) {
+return new TimestampedKeyAndJoinSide<>(true, key, timestamp);
+}
+/**
+ * Create a new {@link TimestampedKeyAndJoinSide} instance for the right 
join side if the provide {@code key} is not {@code null}.
+ *
+ * @param key  the key
+ * @param   the type of the key
+ * @return a new {@link TimestampedKeyAndJoinSide} instance for the right 
join side if the provide {@code key} is not {@code null}
+ */
+public static  TimestampedKeyAndJoinSide makeRight(final K key, 
final long timestamp) {
+return new TimestampedKeyAndJoinSide<>(false, key, timestamp);
+}

Review Comment:
   @gharris1727 I have added these static factory methods for a cleaner 
definition. If you prefer, I can have them removed and use the `make` method.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java:
##


Review Comment:
   A clean abstraction. Each join side defines its "this side value" and "other 
side value". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16544) DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames should return null instead of throwing NPE

2024-04-13 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836797#comment-17836797
 ] 

Kuan Po Tseng commented on KAFKA-16544:
---

gentle ping [~chia7712], are you working on this one ? Maybe I can give it a 
try ! :) 

> DescribeTopicsResult#allTopicIds and DescribeTopicsResult#allTopicNames 
> should return null instead of throwing NPE
> --
>
> Key: KAFKA-16544
> URL: https://issues.apache.org/jira/browse/KAFKA-16544
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
>  * @return A future map from topic names to descriptions which can be 
> used to check
>  * the status of individual description if the describe topic 
> request used
>  * topic names, otherwise return null, this request succeeds only 
> if all the
>  * topic descriptions succeed
> {code}
> According the docs, it should return null if we try to get the result 
> unmatched to the request. For example, we call `allTopicNames` in passing 
> `TopicIdCollection`. However, the current implementation will throw NPE 
> directly



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned:
   
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLog
   Partition.fetchRecords
   Partition.readRecords
   UnifiedLog.read 
   UnifiedLog.fetchHighWatermarkMetadata
   UnifiedLog.convertToOffsetMetadataOrThrow
   LocalLog.convertToOffsetMetadataOrThrow
   LocalLog.read
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-13 Thread via GitHub


kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
* Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
*
* @param highWatermarkMetadata the suggested high watermark with offset 
metadata
* @return the updated high watermark offset
*/
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
 val endOffsetMetadata = localLog.logEndOffsetMetadata
-val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-  new LogOffsetMetadata(logStartOffset)
+val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
   KafkaApis.handleFetchRequest
   ReplicaManager.fetchMessages
   ReplicaManager.readFromLocalLog
   Partition.fetchRecords
   Partition.updateFollowerFetchState
   Partition.maybeExpandIsr
   Partition.submitAlterPartition
   ...
   ...
   ...
   # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
   # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
   KafkaController.processReplicaLeaderElection
   KafkaController.onReplicaElection
   PartitionStateMachine.handleStateChanges
   PartitionStateMachine.doHandleStateChanges
   PartitionStateMachine.electLeaderForPartitions
   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't processed the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
if the LeaderAndIsr request fails to process for one partition, then the 
remaining partitions in that request won't be processed.
   
   KafkaApis.handleLeaderAndIsrRequest
   ReplicaManager.becomeLeaderOrFollower
   ReplicaManager.makeLeaders
   Partition.makeLeader
   Partition.maybeIncrementLeaderHW
   UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
   UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker becomes leader for tp0. During this time, the producers won't be 
able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
errorcode to the producer.
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org