Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-06-02 Thread via GitHub


ocadaruma commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1623844071


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -575,9 +575,9 @@ InMemoryLeaderEpochCheckpoint 
getLeaderEpochCheckpoint(UnifiedLog log, long star
 if (log.leaderEpochCache().isDefined()) {
 LeaderEpochFileCache cache = 
log.leaderEpochCache().get().writeTo(checkpoint);
 if (startOffset >= 0) {
-cache.truncateFromStart(startOffset);
+cache.truncateFromStart(startOffset, true);

Review Comment:
   @junrao Thank you for pointing out. I removed InMemoryLeaderEpochCheckpoint 
(and LeaderEpochCheckpoint interface as well) and refactored the PR based on 
that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16713: Define initial set of RPCs for KIP-932 [kafka]

2024-06-02 Thread via GitHub


omkreddy merged PR #16022:
URL: https://github.com/apache/kafka/pull/16022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16879) SystemTime should use singleton mode

2024-06-02 Thread jiandu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiandu reassigned KAFKA-16879:
--

Assignee: jiandu

> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Assignee: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-8735: Check properties file existence first [kafka]

2024-06-02 Thread via GitHub


qinghui-xu commented on PR #7139:
URL: https://github.com/apache/kafka/pull/7139#issuecomment-2144362990

   It turns out the patch is no more relevant with the refactorings in recent 
versions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8735: Check properties file existence first [kafka]

2024-06-02 Thread via GitHub


qinghui-xu closed pull request #7139: KAFKA-8735: Check properties file 
existence first
URL: https://github.com/apache/kafka/pull/7139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16713: Define initial set of RPCs for KIP-932 [kafka]

2024-06-02 Thread via GitHub


omkreddy commented on PR #16022:
URL: https://github.com/apache/kafka/pull/16022#issuecomment-2144356482

   Test failures are not related to this PR: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16022/6/tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16879) SystemTime should use singleton mode

2024-06-02 Thread jiandu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiandu updated KAFKA-16879:
---
Description: 
Currently, the {{SystemTime}} class, which provides system time-related 
functionalities such as getting the current timestamp 、sleep、and await can be 
instantiated multiple times.

Howerver,  system time is unique,In an application, the time obtained in 
different places should be consistent,  But now the time obtained by using the 
Java System class to interact with the underlying layer is the same。

So I suggest changing it to a singleton mode, reflect the uniqueness of system 
time in design

 

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]

  was:
Currently, the {{SystemTime}} class, which provides system time-related 
functionalities such as getting the current timestamp 、sleep、and await can be 
instantiated multiple times.

Howerver,  system time is unique,In an application, the time obtained in 
different places should be consistent,  But now the time obtained by using the 
Java System class to interact with the underlying layer is the same。

So I suggest changing it to a singleton mode, reflect the uniqueness of system 
time in design

 


> SystemTime should use singleton mode
> 
>
> Key: KAFKA-16879
> URL: https://issues.apache.org/jira/browse/KAFKA-16879
> Project: Kafka
>  Issue Type: Improvement
>Reporter: jiandu
>Priority: Minor
>
> Currently, the {{SystemTime}} class, which provides system time-related 
> functionalities such as getting the current timestamp 、sleep、and await can be 
> instantiated multiple times.
> Howerver,  system time is unique,In an application, the time obtained in 
> different places should be consistent,  But now the time obtained by using 
> the Java System class to interact with the underlying layer is the same。
> So I suggest changing it to a singleton mode, reflect the uniqueness of 
> system time in design
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16879) SystemTime should use singleton mode

2024-06-02 Thread jiandu (Jira)
jiandu created KAFKA-16879:
--

 Summary: SystemTime should use singleton mode
 Key: KAFKA-16879
 URL: https://issues.apache.org/jira/browse/KAFKA-16879
 Project: Kafka
  Issue Type: Improvement
Reporter: jiandu


Currently, the {{SystemTime}} class, which provides system time-related 
functionalities such as getting the current timestamp 、sleep、and await can be 
instantiated multiple times.

Howerver,  system time is unique,In an application, the time obtained in 
different places should be consistent,  But now the time obtained by using the 
Java System class to interact with the underlying layer is the same。

So I suggest changing it to a singleton mode, reflect the uniqueness of system 
time in design

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16851: Add remote.log.disable.policy [kafka]

2024-06-02 Thread via GitHub


satishd commented on code in PR #16132:
URL: https://github.com/apache/kafka/pull/16132#discussion_r1623787269


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##
@@ -179,6 +182,7 @@ public Optional serverConfigName(String configName) 
{
 public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false;
 public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It 
indicates the value to be derived from RetentionBytes
 public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates 
the value to be derived from RetentionMs
+public static final String DEFAULT_REMOTE_LOG_DISABLE_POLICY = "retain";

Review Comment:
   Can you declare the respective fields for valid values like "retain" and 
"delete"?



##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -93,6 +93,11 @@ public class TopicConfig {
 "deletes the old segments. Default value is -2, it represents 
`retention.bytes` value to be used. The effective value should always be " +
 "less than or equal to `retention.bytes` value.";
 
+public static final String REMOTE_LOG_DISABLE_POLICY_CONFIG = 
"remote.log.disable.policy";
+
+public static final String REMOTE_LOG_DISABLE_POLICY_DOC = "Determines 
whether tiered data for a topic should be retained or " +

Review Comment:
   Can you add a note about the valid values in the doc and briefly talk about 
the respective behavior of both values?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8206: Allow client to rebootstrap [kafka]

2024-06-02 Thread via GitHub


ivanyu commented on code in PR #13277:
URL: https://github.com/apache/kafka/pull/13277#discussion_r1623777938


##
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##
@@ -1123,6 +1133,17 @@ public long maybeUpdate(long now) {
 // Beware that the behavior of this method and the computation of 
timeouts for poll() are
 // highly dependent on the behavior of leastLoadedNode.
 Node node = leastLoadedNode(now);
+
+// Rebootstrap if needed and configured.
+if (node == null && metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {

Review Comment:
   I have an idea how to avoid rebootstrapping in this case. The idea is to 
pass some information along with the `Node` out of `leastLoadedNode`. See this 
commit 
https://github.com/apache/kafka/pull/13277/commits/3a303b7de650a6d6a94d5652a476ca9a98610380.
 Does it make sense? It changes the interface a little, but the overall change 
turned out to be smaller than I expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16859: Cleanup check if tiered storage is enabled [kafka]

2024-06-02 Thread via GitHub


kamalcph commented on PR #16153:
URL: https://github.com/apache/kafka/pull/16153#issuecomment-2144269279

   I would prefer the other way around:
   
   1. `KafkaConfig.remoteLogManagerConfig().enableRemoteStorageSystem()` sounds 
like a setter which reduces the code readability
   2. `isRemoteLogStorageSystemEnabled` is a getter which makes the code clear 
to read.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16851: Add remote.log.disable.policy [kafka]

2024-06-02 Thread via GitHub


satishd commented on PR #16132:
URL: https://github.com/apache/kafka/pull/16132#issuecomment-2144267407

   @clolov Is it open for review or you start planning to add more changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16027: MINOR Refactor MetadataTest#testUpdatePartitionLeadership (2) [kafka]

2024-06-02 Thread via GitHub


Alexander-Aghili commented on PR #15857:
URL: https://github.com/apache/kafka/pull/15857#issuecomment-2144259480

   Apologizes for the delay. I changed the code so that the tests passed 
locally. Let me know if that resolved the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16854: Add StopReplica v5 [kafka]

2024-06-02 Thread via GitHub


satishd commented on PR #16131:
URL: https://github.com/apache/kafka/pull/16131#issuecomment-2144254775

   Retriggered the build to start the build/tests again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]

2024-06-02 Thread via GitHub


satishd commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1623765428


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -239,6 +244,13 @@ private void removeMetrics() {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {
+return Duration.ofSeconds(1);

Review Comment:
   Can you use the declared constant value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851514#comment-17851514
 ] 

Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 4:21 AM:
-

[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskManagerException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
} {code}
 

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!


was (Author: JIRAUSER305566):
[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskManagerException is thrown and is caught inside here:

 

```
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
}
```

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce

[jira] [Comment Edited] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851514#comment-17851514
 ] 

Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 4:21 AM:
-

[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskMigratedException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
} {code}
 

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!


was (Author: JIRAUSER305566):
[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskManagerException is thrown and is caught inside here:
{code:java}
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
} {code}
 

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> 

[jira] [Commented] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851514#comment-17851514
 ] 

Ganesh Sadanala commented on KAFKA-16876:
-

[~rohanpd] Thank your for confirming it! I see the flow is from

`TaskManager#handleRevocation` -> `TaskManager#prepareCommitAndAddOffsetsToMap` 
-> `StreamTask#prepareCommit` -> `StreamTask#flush` -> 
`ProcessorStateManager#flushCache`

 

and registered state stores are iterated inside it, I see that in your case 
TaskManagerException is thrown and is caught inside here:

 

```
catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by Streams itself
if (exception instanceof StreamsException) {
firstException = exception;
}
```

 

and finally it is thrown to `StreamTask#flush` method, where I see it is not 
caught/handled. Hence, the entire flow leads to Runtime errors and all the 
active tasks are not revoked. Please correct me if I am wrong.

 

So you would want it to be handled inside the `StreamTask#flush` method 
appropriately?

 

Also could you guide me how you produced those exceptions, I want to produce 
them in my local to get a better picture. 

 

Anything else you want to share will be beneficial.

 

Thank you!

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.inter

[jira] [Commented] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851499#comment-17851499
 ] 

Rohan Desai commented on KAFKA-16876:
-

[~ganesh_6] I don't have that stack trace unfortunately, but I'm pretty certain 
that the error is being thrown by `ProcessorStateManager#flushCache`. The 
problem is that `ProcessorStateManager#flushCache` throws the exception that 
was thrown inside the Producer's io thread and returned in the future, instead 
of re-wrapping it, so the stack trace is the Producer thread stack trace 
instead of the stream thread stack trace (which is where `flushCache` is called 
from).

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstrac

[jira] [Comment Edited] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851490#comment-17851490
 ] 

Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 3:56 AM:
-

[~rohanpd] Can you share the `TaskMigratedException` stack trace as well? I 
assume that it is thrown from the `ProcessorStateManager#flushCache` method. 


was (Author: JIRAUSER305566):
[~rohanpd] Can you share the `TaskMigratedException` stack trace as well?

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.Abstract

[jira] [Comment Edited] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851490#comment-17851490
 ] 

Ganesh Sadanala edited comment on KAFKA-16876 at 6/3/24 3:34 AM:
-

[~rohanpd] Can you share the `TaskMigratedException` stack trace as well?


was (Author: JIRAUSER305566):
[~rohanpd] Can you share the `TaskManagerException` stack trace as well?

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>  [kafka-clients-3.6.

[jira] [Commented] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-06-02 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851490#comment-17851490
 ] 

Ganesh Sadanala commented on KAFKA-16876:
-

[~rohanpd] Can you share the `TaskManagerException` stack trace as well?

> TaskManager.handleRevocation doesn't handle errors thrown from 
> task.prepareCommit
> -
>
> Key: KAFKA-16876
> URL: https://issues.apache.org/jira/browse/KAFKA-16876
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Rohan Desai
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> `TaskManager.handleRevocation` does not handle exceptions thrown by 
> `task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
> flushed caches which led to downstream `producer.send` calls that threw a 
> `TaskMigratedException`. This means that the tasks that need to be revoked 
> are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the 
> thrown exception and then moves on to the other task assignment callbacks. 
> One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks 
> and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks 
> if close fails so we don't leak any tasks. I think there's maybe two bugs 
> here:
>  # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. 
> It should try not to leave any revoked tasks in an unsuspended state.
>  # The `ConsumerCoordinator` just throws the first exception that it sees. 
> But it seems bad to throw the `TaskMigratedException` and drop the 
> `IllegalStateException` (though in this case I think its relatively benign). 
> I think on `IllegalStateException` we really want the streams thread to exit. 
> One idea here is to have `ConsumerCoordinator` throw an exception type that 
> includes the other exceptions that it has seen in another field. But this 
> breaks the contract for clients that catch specific exceptions. I'm not sure 
> of a clean solution, but I think its at least worth recording that it would 
> be preferable to have the caller of `poll` handle all the thrown exceptions 
> rather than just the first one.
>  
> Here is the IllegalStateException stack trace I observed:
> {code:java}
> [       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
> [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
> stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
> reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
> tasks before re-throwing:
> [       508.535] [service_application2] [inf] 
> java.lang.IllegalStateException: Illegal state RUNNING while closing active 
> task 0_3
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
>  ~[kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>  [kafka-streams-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>  [kafka-clients-3.6.0.jar:?]
> [       508.535] [service_application2] [inf] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.j

Re: [PR] KAFKA-16214: add client info in authentication error log [kafka]

2024-06-02 Thread via GitHub


github-actions[bot] commented on PR #15280:
URL: https://github.com/apache/kafka/pull/15280#issuecomment-2144213214

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16102: fix the dynamic modification of listeners' IP or port no… [kafka]

2024-06-02 Thread via GitHub


github-actions[bot] commented on PR #15321:
URL: https://github.com/apache/kafka/pull/15321#issuecomment-2144213180

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16859) Cleanup check if tiered storage is enabled

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16859.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Cleanup check if tiered storage is enabled
> --
>
> Key: KAFKA-16859
> URL: https://issues.apache.org/jira/browse/KAFKA-16859
> Project: Kafka
>  Issue Type: Task
>Reporter: Mickael Maison
>Assignee: 黃竣陽
>Priority: Minor
> Fix For: 3.9.0
>
>
> We have 2 ways to detect whether tiered storage is enabled:
> - KafkaConfig.isRemoteLogStorageSystemEnabled
> - KafkaConfig.remoteLogManagerConfig().enableRemoteStorageSystem()
> We use both in various files. We should stick with one way to do it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16859: Cleanup check if tiered storage is enabled [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16153:
URL: https://github.com/apache/kafka/pull/16153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]

2024-06-02 Thread via GitHub


gongxuanzhang commented on PR #16172:
URL: https://github.com/apache/kafka/pull/16172#issuecomment-2144174063

   @chia7712  I changed it , plz take a look
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Apply spotless to transaction-coordinator and server-common [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on PR #16172:
URL: https://github.com/apache/kafka/pull/16172#issuecomment-2144167371

   @gongxuanzhang Could you please update the description to share the tips of 
"how to test this PR"?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


gongxuanzhang opened a new pull request, #16172:
URL: https://github.com/apache/kafka/pull/16172

   This PR is sub PR from https://github.com/apache/kafka/pull/16097.
   In order not to modify large-scale code, we plan add it step by step in 
module. 
   The first is 'transaction-coordinator' 'server-common'.
   Module  'transaction-coordinator' meets the requirements(only have 2 Java 
class) so add rule directly.
   This PR update 'server-common' module code review import order.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16878) Remove powermock from code base

2024-06-02 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16878:
--

 Summary: Remove powermock from code base
 Key: KAFKA-16878
 URL: https://issues.apache.org/jira/browse/KAFKA-16878
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


This is the follow-up of KAFKA-16223. It should include following changes:

1. rename KafkaConfigBackingStoreMockitoTest back to KafkaConfigBackingStoreTest
2. remove all powermock-related code from code base



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16583: Handle PartitionChangeRecord without directory IDs [kafka]

2024-06-02 Thread via GitHub


soarez commented on PR #16118:
URL: https://github.com/apache/kafka/pull/16118#issuecomment-2144080705

   @showuon thanks for testing and sharing this.
   
   In those logs the controller is rejecting the assignment with a 
`NOT_LEADER_OR_FOLLOWER` because the partition has been moved away from the 
broker. Here the controller is comparing broker IDs, not directory IDs. A 
failed assignment is re-qeueued, so this error will persist until the broker 
dies or the replica is assigned back to it.
   
   I'm thinking of two options:
   
   1. Cancel any pending assignment for a replica when a metadata update shows 
the broker is no longer a replica for that partition.
   2. Accept `NOT_LEADER_OR_FOLLOWER` as indication that a reassignment has 
taken place, and do not retry.
   
   I'm leaning towards option 2. since it's much simpler and there's no other 
case when `handleAssignReplicasToDirs` returns `NOT_LEADER_OR_FOLLOWER`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on PR #16115:
URL: https://github.com/apache/kafka/pull/16115#issuecomment-2144015771

   hi @brenden20 - left some comments. lmk what do you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on PR #16124:
URL: https://github.com/apache/kafka/pull/16124#issuecomment-2144015658

   hi @brenden20 - left some comments. lmk what do you think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649872


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() {
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
+private HashMap 
mockTopicPartitionOffset() {
+final TopicPartition t0 = new TopicPartition("t0", 2);
+final TopicPartition t1 = new TopicPartition("t0", 3);
+HashMap topicPartitionOffsets = new 
HashMap<>();

Review Comment:
   1. this can be final as well
   2. let's use Map on the left side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623649732


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -339,6 +358,15 @@ void testRunOnceInvokesReaper() {
 verify(applicationEventReaper).reap(any(Long.class));
 }
 
+private HashMap 
mockTopicPartitionOffset() {

Review Comment:
   try to use Map instead of HashMap



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648674


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -231,10 +254,7 @@ public void testAssignmentChangeEvent() {
 
 consumerNetworkThread.runOnce();
 
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
-verify(networkClient, times(1)).poll(anyLong(), anyLong());
-verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
-// Assignment change should generate an async commit (not retried).
-verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
+verify(networkClientDelegate, times(1)).poll(anyLong(), anyLong());

Review Comment:
   can we be consistent at using times(1) here? above you removed times(1) but 
it seems to be inconsistently applied elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648578


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -207,7 +230,7 @@ public void testResetPositionsProcessFailureIsIgnored() {
 
 ResetPositionsEvent event = new 
ResetPositionsEvent(calculateDeadlineMs(time, 100));
 applicationEventsQueue.add(event);
-assertDoesNotThrow(() -> consumerNetworkThread.runOnce());
+assertDoesNotThrow(consumerNetworkThread::runOnce);

Review Comment:
   can we revert the irrelevant change? also I think we can remove this test 
and see if this is covered elsewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648474


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).maximumTimeToWait(anyLong(;
+verify(networkClientDelegate).poll(anyLong(), anyLong());
+}
+
 @Test
 public void testApplicationEvent() {
 ApplicationEvent e = new PollEvent(100);
 applicationEventsQueue.add(e);
 consumerNetworkThread.runOnce();
-verify(applicationEventProcessor, times(1)).process(e);
+verify(applicationEventProcessor).process(e);
 }
 
 @Test
 public void testMetadataUpdateEvent() {

Review Comment:
   i think can probably remove this teset as it is doing exactly the same thing 
as `testApplicationEvent`.  We should probably try to find a similar test to 
ensure the code path is covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623648058


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -149,20 +164,28 @@ public void testStartupAndTearDown() throws 
InterruptedException {
 "The consumer network thread did not stop within " + 
DEFAULT_MAX_WAIT_MS + " ms");
 }
 
+@Test
+void testRequestManagersArePolledOnce() {
+consumerNetworkThread.runOnce();
+requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> 
verify(rm, times(1)).poll(anyLong(;

Review Comment:
   i believe times(1) is the default, could you verify this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647935


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(
+time,
+config,
+logContext,
+client
+);
 
-private ConsumerTestBuilder testBuilder;
-private Time time;
-private ConsumerMetadata metadata;
-private NetworkClientDelegate networkClient;
-private BlockingQueue applicationEventsQueue;
-private ApplicationEventProcessor applicationEventProcessor;
-private OffsetsRequestManager offsetsRequestManager;
-private CommitRequestManager commitRequestManager;
-private CoordinatorRequestManager coordinatorRequestManager;
-private ConsumerNetworkThread consumerNetworkThread;
-private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-private MockClient client;
-
-@BeforeEach
-public void setup() {
-testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-time = testBuilder.time;
-metadata = testBuilder.metadata;
-networkClient = testBuilder.networkClientDelegate;
-client = testBuilder.client;
-applicationEventsQueue = testBuilder.applicationEventQueue;
-applicationEventProcessor = testBuilder.applicationEventProcessor;
-commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-offsetsRequestManager = testBuilder.offsetsRequestManager;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-consumerNetworkThread = new ConsumerNetworkThread(
-testBuilder.logContext,
+this.consumerNetworkThread = new ConsumerNetworkThread(
+logContext,
 time,
-testBuilder.applicationEventQueue,
+applicationEventsQueue,
 applicationEventReaper,
 () -> applicationEventProcessor,
-() -> testBuilder.networkClientDelegate,
-() -> testBuilder.requestManagers
+() -> networkClientDelegate,
+() -> requestManagers
 );
+}
+
+@BeforeEach
+public void setup() {
 consumerNetworkThread.initializeResources();
 }
 
 @AfterEach
 public void tearDown() {
-if (testBuilder != null) {
-testBuilder.close();
-consumerNetworkThread.close(Durat

Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647643


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();
+this.metadata = mock(ConsumerMetadata.class);
+this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+this.applicationEventReaper = mock(CompletableEventReaper.class);
+this.client = new MockClient(time);
+
+this.networkClient = new NetworkClientDelegate(

Review Comment:
   i think i might have asked you to use MockClient here.  since we aren't 
necessary testing request sending etc., can we just mock the 
networkClientDelegate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647362


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -74,68 +52,105 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+private final Time time;
+private final ConsumerMetadata metadata;
+private final BlockingQueue applicationEventsQueue;
+private final ApplicationEventProcessor applicationEventProcessor;
+private final OffsetsRequestManager offsetsRequestManager;
+private final HeartbeatRequestManager heartbeatRequestManager;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final ConsumerNetworkThread consumerNetworkThread;
+private final MockClient client;
+private final NetworkClientDelegate networkClientDelegate;
+private final NetworkClientDelegate networkClient;
+private final RequestManagers requestManagers;
+private final CompletableEventReaper applicationEventReaper;
+
+ConsumerNetworkThreadTest() {
+LogContext logContext = new LogContext();
+ConsumerConfig config = mock(ConsumerConfig.class);
+this.time = new MockTime();
+this.networkClientDelegate = mock(NetworkClientDelegate.class);
+this.requestManagers = mock(RequestManagers.class);
+this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.applicationEventsQueue = new LinkedBlockingQueue<>();

Review Comment:
   can we mock this as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16001 Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1623647091


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -16,33 +16,19 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
-import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.*;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
-import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
-import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
-import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
-import org.apache.kafka.clients.consumer.internals.events.PollEvent;
-import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
-import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
-import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
+import org.apache.kafka.clients.consumer.internals.events.*;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorRequest;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.requests.*;

Review Comment:
   in kafka we don't use wildcard import, could you adjust the setting in the 
IDE? I think we also want to avoid rearranging imports since these changes 
aren't necessary relevant to this PR.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -53,19 +39,11 @@
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;

Review Comment:
   same, we don't use wild card.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1623646860


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;

Review Comment:
   i think we should try to consistently apply final to all variables if we 
want to use final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1623646783


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -483,6 +483,23 @@ public void resetTimer() {
 this.heartbeatTimer.reset(heartbeatIntervalMs);
 }
 
+@Override
+public String toStringBase() {
+return super.toStringBase() +
+", heartbeatTimer=" + heartbeatTimer +
+", heartbeatIntervalMs=" + heartbeatIntervalMs;
+}
+
+// Visible for testing

Review Comment:
   i don't think we need the `heartbeatTimer()` and `heartbeatIntervalMs()` 
here.  see the comment below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1623646688


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState = new 
HeartbeatRequestState(
+logContext,
+time,
+10,
+retryBackoffMs,
+retryBackoffMaxMs,
+.2
+);
+
+RequestState requestState = new RequestState(
+logContext,
+HeartbeatRequestManager.HeartbeatRequestState.class.getName(),
+retryBackoffMs,
+retryBackoffMaxMs
+);
+
+String target = requestState.toStringBase() +
+", heartbeatTimer=" + heartbeatRequestState.heartbeatTimer() +
+", heartbeatIntervalMs=" + 
heartbeatRequestState.heartbeatIntervalMs();

Review Comment:
   Just test against `heartbeatIntervalMs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1623646632


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState = new 
HeartbeatRequestState(
+logContext,
+time,
+10,

Review Comment:
   extract this out as a variable for testing later



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -152,6 +152,34 @@ public void cleanup() {
 }
 }
 
+@Test
+public void testHeartBeatRequestStateToStringBase() {
+final long retryBackoffMs = 100;
+final long retryBackoffMaxMs = 1000;
+LogContext logContext = new LogContext();
+HeartbeatRequestState heartbeatRequestState = new 
HeartbeatRequestState(
+logContext,
+time,
+10,
+retryBackoffMs,
+retryBackoffMaxMs,
+.2
+);
+
+RequestState requestState = new RequestState(
+logContext,
+HeartbeatRequestManager.HeartbeatRequestState.class.getName(),
+retryBackoffMs,
+retryBackoffMaxMs
+);
+
+String target = requestState.toStringBase() +
+", heartbeatTimer=" + heartbeatRequestState.heartbeatTimer() +

Review Comment:
   can we just apply `time.timer(heartbeatIntervalMs)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16558 Implemented HeartbeatRequestState toStringBase() and added a test for it [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16124:
URL: https://github.com/apache/kafka/pull/16124#discussion_r1623645864


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -452,7 +452,7 @@ private void handleFatalFailure(Throwable error) {
  * object extends {@link RequestState} to enable exponential backoff and 
duplicated request handling. The two fields
  * that it holds are:
  */
-static class HeartbeatRequestState extends RequestState {
+protected static class HeartbeatRequestState extends RequestState {

Review Comment:
   this is for testing purpose right? add a // Visible for testing comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623645517


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+this.commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+this.offsetFetchRequestState = commitRequestManager.new 
OffsetFetchRequestState(
+mock(Set.class),
+retryBackoffMs, retryBackoffMaxMs, expirationTimeMs,
+memberInfo
+);
+
+this.requestState = new RequestState(

Review Comment:
   this should be `new RetriableRequestState` since offsetFetchRequestState 
implements the retriable one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623645445


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   Done. Please review it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623644884


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -106,6 +106,9 @@ public class CommitRequestManagerTest {
 private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
 private final Metrics metrics = new Metrics();
 private Properties props;
+private CommitRequestManager.OffsetFetchRequestState 
offsetFetchRequestState;

Review Comment:
   the `commitRequestManager`, `offsetFetchRequestState` and `requestState` can 
all stay in the scope of `testOffsetFetchRequestStateToStringBase` no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623644884


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -106,6 +106,9 @@ public class CommitRequestManagerTest {
 private OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
 private final Metrics metrics = new Metrics();
 private Properties props;
+private CommitRequestManager.OffsetFetchRequestState 
offsetFetchRequestState;

Review Comment:
   the offsetFetchRequestState and requestState can both stay in the scope of 
`testOffsetFetchRequestStateToStringBase` no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623644649


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   > Shall i remove the exception? 
   
   yep, I prefer this way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623644470


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1289,5 +1293,11 @@ static class MemberInfo {
 this.memberId = Optional.empty();
 this.memberEpoch = Optional.empty();
 }
+
+@Override
+public String toString() {

Review Comment:
   Add the class name.  reformat it to
   ```
   MemberInfo(memberId=..., memberEpoch=...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12572) Add import ordering checkstyle rule and configure an automatic formatter

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12572.

Fix Version/s: 3.9.0
   Resolution: Fixed

fixed by 
https://github.com/apache/kafka/commit/342e69192f62b89b7d1ea824aeecc09c38899d72

> Add import ordering checkstyle rule and configure an automatic formatter
> 
>
> Key: KAFKA-12572
> URL: https://issues.apache.org/jira/browse/KAFKA-12572
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
> Fix For: 3.9.0
>
>
> # Add import ordering checkstyle rules.
> # Configure an automatic formatter which satisfies 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623643756


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   Shall i remove the exception? or shall I update the code to this for now? 
   
   ```
   if(cipherInformation != null) this.cipherInformation=cipherInformation; 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16097:
URL: https://github.com/apache/kafka/pull/16097


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623643756


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   Shall i remove the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16171:
URL: https://github.com/apache/kafka/pull/16171


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623643481


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   agree to consistence. We can throw exception in the future if extra checks 
are added into production code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16698 : Add timeout to alterClientQuotas [kafka]

2024-06-02 Thread via GitHub


muralibasani commented on PR #16168:
URL: https://github.com/apache/kafka/pull/16168#issuecomment-2144003438

   @chia7712 would you like to take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623639224


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   Since there are no invocation of the above method, no tests have failed in 
local



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623637352


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   I see that `SelectorChannelMetadataRegistry#registerCipherInformation` does 
not throw any such exception.  The proposed change for 
`DefaultChannelMetadataRegistry` is because it is only used in test classes, 
not for production. However, I see that there are no invocations of 
`DefaultChannelMetadataRegistry#registerCipherInformation` so far. In my 
opinion to keep it consistent, either both the classes should have exception 
thrown and handled properly, otherwise since 
`SelectorChannelMetadataRegistry#registerCipherInformation` is currently used 
in production and it does not have the exception thrown, 
`DefaultChannelMetadataRegistry#registerCipherInformation` does not require any 
exception. @divijvaidya @dajac You want to add anything?  I guess the author 
suggested to throw the exception for future use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623637352


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   I see that `SelectorChannelMetadataRegistry#registerCipherInformation` does 
not throw any such exception.  The proposed change for 
`DefaultChannelMetadataRegistry` is because it is only used in test classes, 
not for production. However, I see that there are no invocations of 
`DefaultChannelMetadataRegistry#registerCipherInformation` so far. In my 
opinion to keep it consistent, either both the classes should have exception 
thrown and handled properly, otherwise since 
`SelectorChannelMetadataRegistry#registerCipherInformation` is currently used 
in production and it does not have the exception thrown, 
`DefaultChannelMetadataRegistry#registerCipherInformation` does not require any 
exception needed. @divijvaidya @dajac You want to add anything?  I guess the 
author suggested to throw the exception for future use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on PR #13931:
URL: https://github.com/apache/kafka/pull/13931#issuecomment-2143994107

   @joobisb any updates?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623637352


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   I see that `SelectorChannelMetadataRegistry#registerCipherInformation` does 
not throw any such exception. The proposed change for 
`DefaultChannelMetadataRegistry` is because it is only used in test classes, 
not for production. @divijvaidya @dajac You want to add anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623637352


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   I see that `SelectorChannelMetadataRegistry#registerCipherInformation` does 
not throw any such exception. The proposed change for 
`DefaultChannelMetadataRegistry` is because it is only test classes, not for 
production. @divijvaidya @dajac You want to add anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


ganesh-sadanala commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623637352


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   I see that `SelectorChannelMetadataRegistry#registerCipherInformation` does 
not throw any such exception. The proposed change for 
`DefaultChannelMetadataRegistry` is because it is only used in running test 
cases, not for production. @divijvaidya @dajac You want to add anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15156 Update cipherInformation correctly in DefaultChannelMetadataRegistry [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16169:
URL: https://github.com/apache/kafka/pull/16169#discussion_r1623634685


##
clients/src/test/java/org/apache/kafka/common/network/DefaultChannelMetadataRegistry.java:
##
@@ -22,9 +22,10 @@ public class DefaultChannelMetadataRegistry implements 
ChannelMetadataRegistry {
 
 @Override
 public void registerCipherInformation(final CipherInformation 
cipherInformation) {
-if (this.cipherInformation != null) {
-this.cipherInformation = cipherInformation;
+if (cipherInformation == null) {
+throw new IllegalArgumentException("cipherInformation cannot be 
null");

Review Comment:
   It would be nice that the thrown exception is same to 
`SelectorChannelMetadataRegistry#registerCipherInformation`. Could you check 
that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]

2024-06-02 Thread via GitHub


cmccabe commented on PR #16160:
URL: https://github.com/apache/kafka/pull/16160#issuecomment-2143980581

   There are some test failures in `KafkaRaftClientTest`and `StorageToolTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]

2024-06-02 Thread via GitHub


frankvicky commented on code in PR #16156:
URL: https://github.com/apache/kafka/pull/16156#discussion_r1623486299


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -140,6 +141,35 @@ public void testEnsureTimerSetOnAdd() {
 assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
 }
 
+@Test
+public void testHasAnyPendingRequests() throws Exception {
+try (NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate()) {
+NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
+networkClientDelegate.add(unsentRequest);
+
+// unsent
+assertTrue(networkClientDelegate.hasAnyPendingRequests());
+assertFalse(networkClientDelegate.unsentRequests().isEmpty());
+assertFalse(client.hasInFlightRequests());
+
+networkClientDelegate.poll(0, time.milliseconds());
+
+// in-flight
+assertTrue(networkClientDelegate.hasAnyPendingRequests());
+assertTrue(networkClientDelegate.unsentRequests().isEmpty());
+assertTrue(client.hasInFlightRequests());
+time.sleep(REQUEST_TIMEOUT_MS);

Review Comment:
   Cool, this works like a charm and is more reasonable. 🥳



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest [kafka]

2024-06-02 Thread via GitHub


brandboat commented on PR #16171:
URL: https://github.com/apache/kafka/pull/16171#issuecomment-2143900050

   gentle ping @chia7712 , sorry about this one. Could you take another look 
when you are available ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest [kafka]

2024-06-02 Thread via GitHub


brandboat opened a new pull request, #16171:
URL: https://github.com/apache/kafka/pull/16171

   This is something leaving behind in 
https://github.com/apache/kafka/pull/16170
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16156:
URL: https://github.com/apache/kafka/pull/16156#discussion_r1623465952


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -140,6 +141,35 @@ public void testEnsureTimerSetOnAdd() {
 assertEquals(REQUEST_TIMEOUT_MS, 
ncd.unsentRequests().poll().timer().timeoutMs());
 }
 
+@Test
+public void testHasAnyPendingRequests() throws Exception {
+try (NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate()) {
+NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
+networkClientDelegate.add(unsentRequest);
+
+// unsent
+assertTrue(networkClientDelegate.hasAnyPendingRequests());
+assertFalse(networkClientDelegate.unsentRequests().isEmpty());
+assertFalse(client.hasInFlightRequests());
+
+networkClientDelegate.poll(0, time.milliseconds());
+
+// in-flight
+assertTrue(networkClientDelegate.hasAnyPendingRequests());
+assertTrue(networkClientDelegate.unsentRequests().isEmpty());
+assertTrue(client.hasInFlightRequests());
+time.sleep(REQUEST_TIMEOUT_MS);

Review Comment:
   Could we mock response instead of making timeout? For example:
   ```java
   // in-flight
   assertTrue(networkClientDelegate.hasAnyPendingRequests());
   assertTrue(networkClientDelegate.unsentRequests().isEmpty());
   assertTrue(client.hasInFlightRequests());
   
   // complete request
   
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, GROUP_ID, 
mockNode()));
   networkClientDelegate.poll(0, time.milliseconds());
   assertFalse(networkClientDelegate.hasAnyPendingRequests());
   assertTrue(networkClientDelegate.unsentRequests().isEmpty());
   assertFalse(client.hasInFlightRequests());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16785) Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16785.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra
> ---
>
> Key: KAFKA-16785
> URL: https://issues.apache.org/jira/browse/KAFKA-16785
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: storage_test
> Fix For: 3.9.0
>
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16785: Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16170:
URL: https://github.com/apache/kafka/pull/16170


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams

2024-06-02 Thread MOHD KAIF KHAN (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851416#comment-17851416
 ] 

MOHD KAIF KHAN commented on KAFKA-16871:


[~ableegoldman] So far here's what I have done - 
1. Remove the old static inner class AssignorConfiguration.AssignmentConfigs
2. Replace the usage of this internal class in other classes, by replacing them 
with corresponding getters of new AssignmentConfigs class wherever required.
3. I found one issue in doing step 2 above - 

the new public AssignmentConfigs class has few properties like 
`rackAwareTrafficCost` with type as primitive int, whereas the old 
corresponding static class in AssignorConfiguration had this field as Integer. 
 
There are some null checks on this field in few classes, e.g. 
HighAvailabilityTaskAssignor, doing something like configs.
rackAwareAssignmentTrafficCost == null ... which cant be done with primitives 
if replaced.

I checked in the KIP, its mentioned as OptionalInt, suggesting that this field 
can be null, but in the new AssignmentConfigs impl it was `int`. 

Any suggestion on this would be very helpful. 

> Clean up internal AssignmentConfigs class in Streams
> 
>
> Key: KAFKA-16871
> URL: https://issues.apache.org/jira/browse/KAFKA-16871
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> In KIP-924 we added a new public AssignmentConfigs class to hold all of the, 
> you guessed it, assignment related configs.
> However, there is an existing config class of the same name and largely the 
> same contents but that's in an internal package, specifically inside the 
> AssignorConfiguration class.
> We should remove the old AssignmentConfigs class that's in 
> AssignorConfiguration and replace any usages of it with the new public 
> AssignmentConfigs that we added in KIP-924



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1623408115


##
README.md:
##
@@ -227,11 +227,16 @@ There are two code quality analysis tools that we 
regularly run, spotbugs and ch
 Checkstyle enforces a consistent coding style in Kafka.
 You can run checkstyle using:
 
-./gradlew checkstyleMain checkstyleTest
+./gradlew checkstyleMain checkstyleTest spotlessCheck
 
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+ Spotless 
+This plugin can review code by rules and can also help you check the code , it 
is disabled by default. Some of our code reviews use `spotless` instead of 
`checkstyle`:

Review Comment:
   maybe we will add other check rule by spotless in future ,so i think 
optimize the imports maybe replaced



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


gongxuanzhang commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1623408115


##
README.md:
##
@@ -227,11 +227,16 @@ There are two code quality analysis tools that we 
regularly run, spotbugs and ch
 Checkstyle enforces a consistent coding style in Kafka.
 You can run checkstyle using:
 
-./gradlew checkstyleMain checkstyleTest
+./gradlew checkstyleMain checkstyleTest spotlessCheck
 
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+ Spotless 
+This plugin can review code by rules and can also help you check the code , it 
is disabled by default. Some of our code reviews use `spotless` instead of 
`checkstyle`:

Review Comment:
   maybe we will add other check rule by spotless in future ,so i think 
optimize the imports maybe replaced



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1623401990


##
README.md:
##
@@ -227,11 +227,16 @@ There are two code quality analysis tools that we 
regularly run, spotbugs and ch
 Checkstyle enforces a consistent coding style in Kafka.
 You can run checkstyle using:
 
-./gradlew checkstyleMain checkstyleTest
+./gradlew checkstyleMain checkstyleTest spotlessCheck
 
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+ Spotless 
+This plugin can review code by rules and can also help you check the code , it 
is disabled by default. Some of our code reviews use `spotless` instead of 
`checkstyle`:

Review Comment:
   `The import order is a part of static check. please call `spotlessApply` to 
optimize the imports of Java codes before filing pull request.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10787: Add import ordering checkstyle rule and configure an automatic formatter [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on code in PR #16097:
URL: https://github.com/apache/kafka/pull/16097#discussion_r1623401990


##
README.md:
##
@@ -227,11 +227,16 @@ There are two code quality analysis tools that we 
regularly run, spotbugs and ch
 Checkstyle enforces a consistent coding style in Kafka.
 You can run checkstyle using:
 
-./gradlew checkstyleMain checkstyleTest
+./gradlew checkstyleMain checkstyleTest spotlessCheck
 
 The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html` and 
`reports/checkstyle/reports/test.html` files in the
 subproject build directories. They are also printed to the console. The build 
will fail if Checkstyle fails.
 
+ Spotless 
+This plugin can review code by rules and can also help you check the code , it 
is disabled by default. Some of our code reviews use `spotless` instead of 
`checkstyle`:

Review Comment:
   `The import orders is a part of static check. please call `spotlessApply` to 
optimize the imports of Java codes before filing pull request.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16164:
URL: https://github.com/apache/kafka/pull/16164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16814 KRaft broker cannot startup when `partition.metadata` is missing [kafka]

2024-06-02 Thread via GitHub


gaurav-narula commented on code in PR #16165:
URL: https://github.com/apache/kafka/pull/16165#discussion_r1623392339


##
core/src/test/java/kafka/server/LogManagerIntegrationTest.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.test.junit.RaftClusterInvocationContext;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
+public class LogManagerIntegrationTest {
+private final ClusterInstance cluster;
+
+public LogManagerIntegrationTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest(types = {Type.KRAFT})
+public void testRestartBrokerNoErrorIfMissingPartitionMetadata() throws 
IOException, ExecutionException, InterruptedException {
+RaftClusterInvocationContext.RaftClusterInstance raftInstance = 
(RaftClusterInvocationContext.RaftClusterInstance) cluster;
+
+try (Admin admin = cluster.createAdminClient()) {
+admin.createTopics(Collections.singletonList(new NewTopic("foo", 
1, (short) 1)));
+}
+
+raftInstance.getUnderlying().brokers().get(0).shutdown();
+// delete partition.metadata file here to simulate the scenario that 
partition.metadata not flush to disk yet
+raftInstance.getUnderlying().brokers().get(0)
+.logManager().getLog(new TopicPartition("foo", 0), 
false).get().partitionMetadataFile().get().delete();
+raftInstance.getUnderlying().brokers().get(0).startup();
+assertDoesNotThrow(() -> 
raftInstance.getUnderlying().fatalFaultHandler().maybeRethrowFirstException());
+
+// make sure topic still work fine

Review Comment:
   Let's also wait until ISR is expanded again before producing/consuming.



##
core/src/test/java/kafka/server/LogManagerIntegrationTest.java:
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.Cluster

[jira] [Assigned] (KAFKA-16877) Migrate RemoteLogSegmentLifecycleTest to use new test infra

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16877:
--

Assignee: Kuan Po Tseng  (was: Chia-Ping Tsai)

> Migrate RemoteLogSegmentLifecycleTest to use new test infra
> ---
>
> Key: KAFKA-16877
> URL: https://issues.apache.org/jira/browse/KAFKA-16877
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
>
> https://github.com/apache/kafka/blob/2c82ecd67f2f6b412f625e8efc1457e7fb7f74dd/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java#L432



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16877) Migrate RemoteLogSegmentLifecycleTest to use new test infra

2024-06-02 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851405#comment-17851405
 ] 

Kuan Po Tseng commented on KAFKA-16877:
---

I'm willing to take over this. Many thanks.

> Migrate RemoteLogSegmentLifecycleTest to use new test infra
> ---
>
> Key: KAFKA-16877
> URL: https://issues.apache.org/jira/browse/KAFKA-16877
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> https://github.com/apache/kafka/blob/2c82ecd67f2f6b412f625e8efc1457e7fb7f74dd/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java#L432



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16785: Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra [kafka]

2024-06-02 Thread via GitHub


brandboat opened a new pull request, #16170:
URL: https://github.com/apache/kafka/pull/16170

   related to https://issues.apache.org/jira/browse/KAFKA-16785
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623382628


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+this.commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+this.offsetFetchRequestState = commitRequestManager.new 
OffsetFetchRequestState(
+mock(Set.class),
+retryBackoffMs, retryBackoffMaxMs, expirationTimeMs,

Review Comment:
   let break this into separated line just to be consistent with L154-158.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623382520


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+this.commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+this.offsetFetchRequestState = commitRequestManager.new 
OffsetFetchRequestState(
+mock(Set.class),
+retryBackoffMs, retryBackoffMaxMs, expirationTimeMs,
+memberInfo
+);
+
+this.requestState = new RequestState(
+logContext,
+"CommitRequestManager",
+retryBackoffMs,
+retryBackoffMaxMs);
+
+String target = requestState.toStringBase() +
+", memberInfo={" + offsetFetchRequestState.memberInfo +

Review Comment:
   we can just test the values against L131-135, i think target shoud just be:
   memberInfo={memberInfo}, as well as other fields below.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+this.commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+this.offsetFetchRequestState = commitRequestManager.new 
OffsetFetchRequestState(
+mock(Set.class),
+retryBackoffMs, retryBackoffMaxMs, expirationTimeMs,
+memberInfo
+);
+
+this.requestState = new RequestState(
+logContext,
+"CommitRequestManager",
+retryBackoffMs,
+retryBackoffMaxMs);
+
+String target = requestState.toStringBase() +
+", memberInfo={" + offsetFetchRequestState.memberInfo +
+"}, expirationTimeMs=" + 
(offsetFetchRequestState.expirationTimeMs().isPresent() ? 
offsetFetchRequestState.expirationTimeMs() : "undefined") +

Review Comment:
   i think we are expecting an expirationTimeMs so just use `expirationTimeMs=" 
+ expirationTimeMs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623382281


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);
+CommitRequestManager.MemberInfo memberInfo = new 
CommitRequestManager.MemberInfo();
+
+this.commitRequestManager = new CommitRequestManager(
+time,
+logContext,
+subscriptionState,
+config,
+coordinatorRequestManager,
+offsetCommitCallbackInvoker,
+"groupId",
+Optional.of("groupInstanceId"),
+metrics);
+
+this.offsetFetchRequestState = commitRequestManager.new 
OffsetFetchRequestState(
+mock(Set.class),
+retryBackoffMs, retryBackoffMaxMs, expirationTimeMs,
+memberInfo
+);
+
+this.requestState = new RequestState(
+logContext,
+"CommitRequestManager",
+retryBackoffMs,
+retryBackoffMaxMs);
+
+String target = requestState.toStringBase() +
+", memberInfo={" + offsetFetchRequestState.memberInfo +
+"}, expirationTimeMs=" + 
(offsetFetchRequestState.expirationTimeMs().isPresent() ? 
offsetFetchRequestState.expirationTimeMs() : "undefined") +

Review Comment:
   i think we are expecting an expirationTimeMs so just use `expirationTimeMs=" 
+ expirationTimeMs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623381407


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -123,6 +126,47 @@ public void setup() {
 this.props.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
 }
 
+@Test
+public void testOffsetFetchRequestStateToStringBase() {
+final long retryBackoffMs = 10;
+final long retryBackoffMaxMs = 100;
+final long expirationTimeMs = 1000;
+ConsumerConfig config = mock(ConsumerConfig.class);

Review Comment:
   try be consistent with the syntax.  either use final for the config and 
memberinfo or just not use final for the longs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16557 Implemented OffsetFetchRequestState toStringBase and added a test for it in CommitRequestManagerTest [kafka]

2024-06-02 Thread via GitHub


philipnee commented on code in PR #16115:
URL: https://github.com/apache/kafka/pull/16115#discussion_r1623380991


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -227,8 +275,8 @@ public void testPollEnsureEmptyPendingRequestAfterPoll() {
 CommitRequestManager commitRequestManager = create(true, 100);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 Map offsets = 
Collections.singletonMap(
-new TopicPartition("topic", 1),
-new OffsetAndMetadata(0));
+new TopicPartition("topic", 1),

Review Comment:
   i wonder if intellij's auto indentation automatically changes the 
indentation when copying the content over.  double check the indentation for 
these files they should be there.  it could be easier to use a text editor like 
vim/emac to edit out these unintended changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16877) Migrate RemoteLogSegmentLifecycleTest to use new test infra

2024-06-02 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16877:
--

 Summary: Migrate RemoteLogSegmentLifecycleTest to use new test 
infra
 Key: KAFKA-16877
 URL: https://issues.apache.org/jira/browse/KAFKA-16877
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


https://github.com/apache/kafka/blob/2c82ecd67f2f6b412f625e8efc1457e7fb7f74dd/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java#L432



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16807) DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16807.

Resolution: Fixed

> DescribeLogDirsResponseData#results#topics have unexpected topics having 
> empty partitions
> -
>
> Key: KAFKA-16807
> URL: https://issues.apache.org/jira/browse/KAFKA-16807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
>
> ReplicaManager [0] could generate a response having unexpected topics which 
> have empty partitions. The root cause is it always generate the topic 
> collection even though they have no matched partitions.
> That is not a issue to Kafka clients, since we loop the "partitions" to fill 
> all future responses [1]. Hence, those unexpected topics won't be existent in 
> the final results.
> However, that could be a issue to the users who implement Kafka client based 
> on Kafka protocol [2]
> [0] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1252
> [1] 
> https://github.com/apache/kafka/blob/b5a013e4564ad93026b9c61431e4573a39bec766/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L3145
> [2] https://lists.apache.org/thread/lp7ktmm17pbg7nqk7p4s904lcn3mrvhy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16807) DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions

2024-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16807:
---
Fix Version/s: 3.9.0

> DescribeLogDirsResponseData#results#topics have unexpected topics having 
> empty partitions
> -
>
> Key: KAFKA-16807
> URL: https://issues.apache.org/jira/browse/KAFKA-16807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
>
> ReplicaManager [0] could generate a response having unexpected topics which 
> have empty partitions. The root cause is it always generate the topic 
> collection even though they have no matched partitions.
> That is not a issue to Kafka clients, since we loop the "partitions" to fill 
> all future responses [1]. Hence, those unexpected topics won't be existent in 
> the final results.
> However, that could be a issue to the users who implement Kafka client based 
> on Kafka protocol [2]
> [0] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1252
> [1] 
> https://github.com/apache/kafka/blob/b5a013e4564ad93026b9c61431e4573a39bec766/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L3145
> [2] https://lists.apache.org/thread/lp7ktmm17pbg7nqk7p4s904lcn3mrvhy



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]

2024-06-02 Thread via GitHub


chia7712 merged PR #16042:
URL: https://github.com/apache/kafka/pull/16042


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on PR #15830:
URL: https://github.com/apache/kafka/pull/15830#issuecomment-2143770889

   @pasharik Could you please take a look at failed tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14405: Log a warning when users attempt to set a config controlled by Streams [kafka]

2024-06-02 Thread via GitHub


ashmeet13 commented on PR #12988:
URL: https://github.com/apache/kafka/pull/12988#issuecomment-2143755742

   Hi @mjsax requesting a review on this PR.
   
   There is one open case still that I am not sure how we should handle - it's 
the check @ableegoldman mentioned in here comment 
[here](https://github.com/apache/kafka/pull/12988/#pullrequestreview-1225470833)
 to handler the `ProducerConfig.PARTITIONER_CLASS_CONFIG (partitioner.class)` 
config.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]

2024-06-02 Thread via GitHub


dajac commented on PR #16163:
URL: https://github.com/apache/kafka/pull/16163#issuecomment-2143747462

   Sure. I can review it tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]

2024-06-02 Thread via GitHub


chia7712 commented on PR #16163:
URL: https://github.com/apache/kafka/pull/16163#issuecomment-2143746525

   @frankvicky @dajac thanks for all your responses. I will take a look at this 
PR later. @dajac it would be great if this PR can have reviews from you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]

2024-06-02 Thread via GitHub


dajac commented on PR #16163:
URL: https://github.com/apache/kafka/pull/16163#issuecomment-2143745346

   @frankvicky Thanks for this! As I said, I am fine with merging the fix. We 
should definitely keep it. I just wanted to signal that there is an ongoing 
effort from @dongnuo123 to add integration and system tests for the 
upgrade/downgrade paths.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >