Re: [PR] KAFKA-16967: NioEchoServer fails to register connection and causes flaky failure. [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on PR #16384:
URL: https://github.com/apache/kafka/pull/16384#issuecomment-2185672343

   @frankvicky could you please take a look at filed test? that may be related 
to this PR?
   ```
   org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Metric not updated failed-authentication-total expected:<1.0> but was:<24.0> 
==> expected:  but was: 
   栈跟踪
   org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Metric not updated failed-authentication-total expected:<1.0> but was:<24.0> 
==> expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394)
at 
app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378)
at 
app//org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:213)
at 
app//org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:172)
at 
app//org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedCiphers(SslTransportLayerTest.java:650)
at 
java.base@21.0.3/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base@21.0.3/java.lang.reflect.Method.invoke(Method.java:580)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool

2024-06-23 Thread Abhijeet Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859568#comment-17859568
 ] 

Abhijeet Kumar commented on KAFKA-16853:


Hi [~showuon] . I plan to start this week.

> Split RemoteLogManagerScheduledThreadPool
> -
>
> Key: KAFKA-16853
> URL: https://issues.apache.org/jira/browse/KAFKA-16853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Abhijeet Kumar
>Priority: Major
>
> *Summary*
> To begin with create just the RemoteDataExpirationThreadPool and move 
> expiration to it. Keep all settings as if the only thread pool was the 
> RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired 
> correctly to the RemoteLogManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Implemented release acquired records functionality to SharePartition [kafka]

2024-06-23 Thread via GitHub


adixitconfluent opened a new pull request, #16430:
URL: https://github.com/apache/kafka/pull/16430

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Remote fetch throttle metrics [kafka]

2024-06-23 Thread via GitHub


abhijeetk88 commented on PR #16087:
URL: https://github.com/apache/kafka/pull/16087#issuecomment-2185630174

   > @abhijeetk88
   > 
   > Thanks for the patch! Verified that the metric is emitted via Jconsole. 
The metric objectName is set to `kafka.server:type=RemoteLogManager`, should we 
rename it to `kafka.log.remote:type=RemoteLogManager` instead?
   > 
   > Also, can you add description to each of the metric?
   > 
   > https://private-user-images.githubusercontent.com/11411249/339067376-7131d82e-0735-44b4-8120-2b8f7956f682.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTkyMDY0MzksIm5iZiI6MTcxOTIwNjEzOSwicGF0aCI6Ii8xMTQxMTI0OS8zMzkwNjczNzYtNzEzMWQ4MmUtMDczNS00NGI0LTgxMjAtMmI4Zjc5NTZmNjgyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MjQlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjI0VDA1MTUzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTQ1YmNjNDU5OTI2ZDA5MWQzNmFlMGUzOTIzZjJmOGI0MGE3ZjI2NGQwNTI3ZTU2N2FiMTRlMmI1N2QxMDc0NDMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.TnaOqEOfEmB3cI5IRQ7AOD8Ge3Orjnf-otD26dWfuzk;>
   
   I am facing a problem here. When creating a SensorAccess, it needs 
_org.apache.kafka.common.metrics.Metrics_, but with this the prefix becomes 
kafka.server and cannot be changed. We instead need to use _KafkaMetricsGroup_ 
but there is no way to create SensorAccess with it.
   
   @kamalcph @showuon are you aware of a workaround?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16425:
URL: https://github.com/apache/kafka/pull/16425#discussion_r1650346962


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole 
blackhole) {
 }
 }
 }
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testPCollectionsImmutableMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i 
+ mapSize, 0);

Review Comment:
   so could you please add single-write/multi-read scenario for those maps? 
That is a existent use case in kafka



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


chia7712 merged PR #16396:
URL: https://github.com/apache/kafka/pull/16396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16998) Fix warnings in our Github actions

2024-06-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16998.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Fix warnings in our Github actions
> --
>
> Key: KAFKA-16998
> URL: https://issues.apache.org/jira/browse/KAFKA-16998
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: Mickael Maison
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.9.0
>
>
> Most of our Github actions produce warnings, see 
> [https://github.com/apache/kafka/actions/runs/9572915509|https://github.com/apache/kafka/actions/runs/9572915509.]
>  for example.
> It looks like we need to bump the version we use for actions/checkout, 
> actions/setup-python, actions/upload-artifact to v4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16998 Fix warnings in our Github actions [kafka]

2024-06-23 Thread via GitHub


chia7712 merged PR #16410:
URL: https://github.com/apache/kafka/pull/16410


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16998 Fix warnings in our Github actions [kafka]

2024-06-23 Thread via GitHub


VedarthConfluent commented on PR #16410:
URL: https://github.com/apache/kafka/pull/16410#issuecomment-2185600693

   LGTM, thanks for the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-23 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
--
Description: 
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

 
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory .
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
at org.apache.kafka.clients.producer.internals.Sender.run 
at java.Lang.Thread.run
{code}
 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 
{code:java}
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || 
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and 
batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced 
shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}


{code}
 

2. Then KafkaThread catch uncaught exception and just log it:
{code:java}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
thread '{}':", name, e));
}{code}
 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?

  was:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

 
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory . at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at 
org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run
{code}
 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 
{code:java}
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");

// okay we stopped accepting requests but there 

[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-23 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
--
Description: 
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

 
{code:java}
ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory . at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at 
org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run
{code}
 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 
{code:java}
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || 
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and 
batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced 
shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}


{code}
 

2. Then KafkaThread catch uncaught exception and just log it:
{code:java}
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
thread '{}':", name, e));
}{code}
 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?

  was:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

```

ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory

.

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)

at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)

at org.apache.kafka.clients.producer.internals.Sender.run

at java. Lang.Thread.run

``` 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 

```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try

{ runOnce(); } catch (Exception e) \{ log.error("Uncaught error in kafka 
producer I/O thread: ", e); }
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining 
records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 

[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-23 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
--
Description: 
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
nothing:

```

ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory

.

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)

at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)

at org.apache.kafka.clients.producer.internals.Sender.run

at java. Lang.Thread.run

``` 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 

```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try

{ runOnce(); } catch (Exception e) \{ log.error("Uncaught error in kafka 
producer I/O thread: ", e); }
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining 
records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||    
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try \{ runOnce(); }

catch (Exception e)

{ log.error("Uncaught error in kafka producer I/O thread: ", e); }

}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting())

{ log.info("Aborting incomplete transaction due to shutdown"); 
transactionManager.beginAbort(); }

try

{ runOnce(); }

catch (Exception e)

{ log.error("Uncaught error in kafka producer I/O thread: ", e); }

}

if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and 
wake up the threads waiting on
// the futures.
if (transactionManager != null)

{ log.debug("Aborting incomplete transactional requests due to forced 
shutdown"); transactionManager.close(); }

log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try

{ this.client.close(); }

catch (Exception e)

{ log.error("Failed to close network client", e); }

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
```

2. Then KafkaThread catch uncaught exception and just log it:

```java
public KafkaThread(final String name, Runnable runnable, boolean daemon)

{ super(runnable, name); configureThread(name, daemon); }

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread 
'{}':", name, e));
}
```

 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?

  was:
When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
noning:

```java

ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory

``` 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 

```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining 
records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||    
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");

[jira] [Created] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-23 Thread Hongshun Wang (Jira)
Hongshun Wang created KAFKA-17025:
-

 Summary: KafkaThread and KafkaProducer expose method to set 
setUncaughtExceptionHandler
 Key: KAFKA-17025
 URL: https://issues.apache.org/jira/browse/KAFKA-17025
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 3.6.2
Reporter: Hongshun Wang
 Fix For: 3.6.3


When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
noning:

```java

ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
Direct buffer memory

``` 

 

I try to find what happens:

1. It seems that OutOfMemoryError as a Error is not captured when 
org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
Exception: 

```
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining 
records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for 
acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() ||    
this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the 
transaction manager's queue
while (!forceClose && transactionManager != null && 
transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and 
wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
```

2. Then KafkaThread catch uncaught exception and just log it:

```java
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}

private void configureThread(final String name, boolean daemon) {
setDaemon(daemon);
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread 
'{}':", name, e));
}
```

 

To be honest, I don't understand why KafkaThread doing nothing but log it when 
an uncaught exception occurs? Why not exposing method to set 
setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
determine what to do with uncaught exception, no matter thrown it or just 
ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: New year code clean up - misc [kafka]

2024-06-23 Thread via GitHub


github-actions[bot] commented on PR #15071:
URL: https://github.com/apache/kafka/pull/15071#issuecomment-2185520326

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Change config. compressionType to be an enum type instead [kafka]

2024-06-23 Thread via GitHub


github-actions[bot] commented on PR #15403:
URL: https://github.com/apache/kafka/pull/15403#issuecomment-2185520123

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Remote copy throttle metrics [kafka]

2024-06-23 Thread via GitHub


showuon commented on code in PR #16086:
URL: https://github.com/apache/kafka/pull/16086#discussion_r1650291862


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -789,7 +803,10 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 
 copyQuotaManagerLock.lock();
 try {
-while (rlmCopyQuotaManager.isQuotaExceeded()) {
+while (true) {
+long throttleTimeMs = 
rlmCopyQuotaManager.checkQuotaAndGetThrottleTimeMs();
+if (throttleTimeMs <= 0) break;
+
copyThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());

Review Comment:
   @kamalcph , thanks for the explanation! Good to know we've handled it well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]

2024-06-23 Thread via GitHub


dujian0068 commented on code in PR #16413:
URL: https://github.com/apache/kafka/pull/16413#discussion_r1650266581


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -372,12 +372,12 @@ void onRequestSent(long correlationId, long timeMs) {
 @Override
 public String toString() {
 return String.format(
-"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%d)",
+"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%s)",
 node,
 state,
 lastSendTimeMs,
 lastFailTimeMs,
-inFlightCorrelationId
+inFlightCorrelationId.isPresent() ? 
inFlightCorrelationId.getAsLong() : null

Review Comment:
   You are right, this makes it easier to understand
   it has been modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650219136


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -1616,4 +1623,25 @@ public  KStream processValues(
 processNode,
 builder);
 }
+
+@Override
+public KStream markAsPartitioned() {
+final ProcessorParameters 
processorParameters =
+new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);

Review Comment:
   I will keep this open to remind myself to update the KIP after the review is 
complete. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650218909


##
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##
@@ -685,6 +685,41 @@  KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper,
   final Named named);
 
+/**
+ * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing 
operations.
+ * 
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.

Review Comment:
   @mjsax 
   
   Sorry for the confusion, the javadoc here can be better written...Before I 
do that, the composite key notion came from the original discussion 
[here](https://lists.apache.org/thread/r7yqsoqsox0z2mzxt33r9r99tnwvb58o) - 
nonetheless I should remove it. Perhaps a brief description that `interactive 
query might fail when trying to guess the original key`.
   
   As for Joins, I might need your help. Most of my understanding of the 
problem came from the discussion thread.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650214064


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper selectKeyProcessorNode = 
internalSelectKey(mapper, new NamedInternal(named));
-selectKeyProcessorNode.keyChangingOperation(true);
+selectKeyProcessorNode.keyChangingOperation(repartitionRequired);
 
 builder.addGraphNode(graphNode, selectKeyProcessorNode);
 
 // key serde cannot be preserved
 return new KStreamImpl<>(
-selectKeyProcessorNode.nodeName(),
-null,
-valueSerde,
-subTopologySourceNodes,
-true,
-selectKeyProcessorNode,
-builder);
+selectKeyProcessorNode.nodeName(),

Review Comment:
   Thanks, reverted these indentations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16811 Sliding window approach to calculate non-zero punctuate-ratio metric [kafka]

2024-06-23 Thread via GitHub


ganesh-sadanala commented on PR #16162:
URL: https://github.com/apache/kafka/pull/16162#issuecomment-2185276488

   @mjsax @cadonna Thanks for the insights. I will make the necessary changes 
and would love to write KIP for this change. I recently requested the 
confluence account for another KIP, and it got approved. 
   
   This is one of my first two KIPs. If anything you want to share that would 
help me, please do it. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16959: ConfigCommand should allow to define both `entity-default` and `entity-name` [kafka]

2024-06-23 Thread via GitHub


m1a2st commented on PR #16381:
URL: https://github.com/apache/kafka/pull/16381#issuecomment-2185156040

   Gentle ping, @chia7712, I update the title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-759 Mark as Partitioned [kafka]

2024-06-23 Thread via GitHub


LQXshane commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1650128875


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##
@@ -222,21 +226,21 @@ public  KStream selectKey(final 
KeyValueMapper

Re: [PR] KAFKA-16755: Implement lock timeout functionality in SharePartition [kafka]

2024-06-23 Thread via GitHub


adixitconfluent commented on PR #16414:
URL: https://github.com/apache/kafka/pull/16414#issuecomment-2185094033

   > Thanks for the PR. Looks good overall, with just a request to comment at 
least one of the unit tests a bit more comprehensively to make it easier to see 
what is intended. If the behaviour ever changes subtly, we might have to 
revisit them and making them a little easier to read would be helpful.
   
   Thanks for the review @AndrewJSchofield , I have added a few comments to 
most of these tests to explain the intention. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16650 add integration test for Admin#abortTransaction [kafka]

2024-06-23 Thread via GitHub


brandboat opened a new pull request, #16429:
URL: https://github.com/apache/kafka/pull/16429

   related to https://issues.apache.org/jira/browse/KAFKA-16650
   as title, add integration test for Admin#abortTransaction
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1650089242


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public boolean validate() {
+Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+EMIT_CHECKPOINTS_ENABLED + " set to false");
+return false;
+}
+
+boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;

Review Comment:
   As `!emitCheckpointsValue && !syncGroupOffsetsValue` return before, 
`requireOffsetSyncs` is always true, right? 



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public boolean validate() {
+Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+EMIT_CHECKPOINTS_ENABLED + " set to false");
+return false;
+}
+
+boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;
+if 
(!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))
 & requireOffsetSyncs) {

Review Comment:
   Maybe we should use `EMIT_OFFSET_SYNCS_ENABLED_DEFAULT` here? 



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -366,6 +291,19 @@ boolean update(long upstreamOffset, long downstreamOffset) 
{
 return shouldSyncOffsets;
 }
 
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (!(o instanceof PartitionState)) return false;
+PartitionState that = (PartitionState) o;
+return previousUpstreamOffset == that.previousUpstreamOffset && 
previousDownstreamOffset == that.previousDownstreamOffset && 
lastSyncDownstreamOffset == that.lastSyncDownstreamOffset && maxOffsetLag == 
that.maxOffsetLag && shouldSyncOffsets == that.shouldSyncOffsets;

Review Comment:
   Could you please split it to multi lines?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##
@@ -166,6 +170,25 @@ Duration consumerPollTimeout() {
 return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
 }
 
+public boolean validate() {
+Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+LOG.warn("MirrorCheckpointConnector can't run without both " + 
SYNC_GROUP_OFFSETS_ENABLED + ", " +
+EMIT_CHECKPOINTS_ENABLED + " set to false");
+return false;
+}
+
+boolean requireOffsetSyncs = emitCheckpointsValue || 
syncGroupOffsetsValue;
+if 
(!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true"))
 & requireOffsetSyncs) {
+LOG.warn("MirrorCheckpointConnector can't run with " + 
EMIT_OFFSET_SYNCS_ENABLED + " set to false while, " +
+EMIT_CHECKPOINTS_ENABLED  + " and/o r" + 
SYNC_GROUP_OFFSETS_ENABLED + " set to true");

Review Comment:
   ` and/o r` or ` and/or`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##
@@ -234,6 +234,30 @@ public ConfigDef config() {
 @Override
 public org.apache.kafka.common.config.Config validate(Map 
props) {
 List configValues = super.validate(props).configValues();
+validateExactlyOnceConfigs(props, configValues);
+validateEmitOffsetSyncConfigs(props, configValues);
+
+return new org.apache.kafka.common.config.Config(configValues);
+}
+
+private static void validateEmitOffsetSyncConfigs(Map 
props, List configValues) {
+boolean offsetSyncsConfigured = configValues.stream()
+.anyMatch(conf -> 
conf.name().startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || 

Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on PR #16396:
URL: https://github.com/apache/kafka/pull/16396#issuecomment-2185016432

   Hi @chia7712, I have do some cleanup, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on code in PR #16396:
URL: https://github.com/apache/kafka/pull/16396#discussion_r1650089520


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -380,17 +372,31 @@ public void setUp() {
 }
 }
 
-@Parameterized.Parameters(name = "rackAwareStrategy={0}")
-public static Collection getParamStoreType() {
-return asList(new Object[][] {
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE},
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC},
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY},
-});
+//@Parameterized.Parameters(name = "rackAwareStrategy={0}")

Review Comment:
   Oops, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on code in PR #16396:
URL: https://github.com/apache/kafka/pull/16396#discussion_r1650089120


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java:
##
@@ -71,23 +73,27 @@ public class KeyValueStoreMaterializerTest {
 @Mock
 private StreamsConfig streamsConfig;
 
-@Before

Review Comment:
   Yes, since we don't have any parameters in `setup` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650085659


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();

Review Comment:
   Please handle the null and unmatched version. key and value has their version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16413:
URL: https://github.com/apache/kafka/pull/16413#discussion_r1650083987


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -372,12 +372,12 @@ void onRequestSent(long correlationId, long timeMs) {
 @Override
 public String toString() {
 return String.format(
-"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%d)",
+"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%s)",
 node,
 state,
 lastSendTimeMs,
 lastFailTimeMs,
-inFlightCorrelationId
+inFlightCorrelationId.isPresent() ? 
inFlightCorrelationId.getAsLong() : null

Review Comment:
   how about using `undefined` to replace "null"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


chia7712 commented on code in PR #16396:
URL: https://github.com/apache/kafka/pull/16396#discussion_r1650082583


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -180,9 +169,10 @@ private void setupStore() {
 when(store.name()).thenReturn(storeName);
 }
 
-@Test
-public void shouldNotRegisterSameStoreMultipleTimes() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -199,9 +189,10 @@ public void shouldNotRegisterStoreWithoutMetadata() {
 () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
 }
 
-@Test
-public void shouldSupportUnregisterChangelogBeforeInitialization() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -280,9 +272,10 @@ public void 
shouldSupportUnregisterChangelogBeforeCompletion() {
 assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
 }
 
-@Test
-public void shouldSupportUnregisterChangelogAfterCompletion() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -549,9 +548,10 @@ public void 
shouldRestoreFromPositionAndCheckForCompletion() {
 }
 }
 
-@Test
-public void shouldRestoreFromBeginningAndCheckCompletion() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java:
##
@@ -380,17 +372,31 @@ public void setUp() {
 }
 }
 
-@Parameterized.Parameters(name = "rackAwareStrategy={0}")
-public static Collection getParamStoreType() {
-return asList(new Object[][] {
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE},
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC},
-{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY},
-});
+//@Parameterized.Parameters(name = "rackAwareStrategy={0}")

Review Comment:
   please remove those code



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -338,9 +331,10 @@ public void 
shouldSupportUnregisterChangelogAfterCompletion() {
 }
 }
 
-@Test
-public void shouldInitializeChangelogAndCheckForCompletion() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java:
##
@@ -71,23 +73,27 @@ public class KeyValueStoreMaterializerTest {
 @Mock
 private StreamsConfig streamsConfig;
 
-@Before

Review Comment:
   So we can keep using `@BeforeEach` now, right?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -228,9 +219,10 @@ public void 
shouldSupportUnregisterChangelogBeforeInitialization() {
 assertNull(standbyListener.capturedStore(UPDATE_BATCH));
 }
 
-@Test
-public void shouldSupportUnregisterChangelogBeforeCompletion() {
-setupStateManagerMock();
+@ParameterizedTest
+@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, 
names = {"ACTIVE", "STANDBY"})

Review Comment:
   `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default 
value



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -380,11 +374,12 @@ public void 

[jira] [Commented] (KAFKA-17015) ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be deprecated and throw an exception

2024-06-23 Thread dujian0068 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859473#comment-17859473
 ] 

dujian0068 commented on KAFKA-17015:


Thanks you replay

When I studied the Kafka source code, I find class `ContextualRecord` have an 
attribute is  `ProcessorRecordContext` object  and 
`ProcessorRecordContext#toString()` be deprecated.

I am considering whether to also deprecate `ContextualRecord#toString()`. But 
it seems unnecessary, so I raised this issue

 

 

> ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be 
> deprecated and throw an exception
> -
>
> Key: KAFKA-17015
> URL: https://issues.apache.org/jira/browse/KAFKA-17015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
>
> when review PR#16970。 I find function 
> `ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() ` be 
> deprecated because they have a mutable attribute, which will cause the 
> hashCode to change。 
> I don't think that hashCode should be discarded just because it is mutable. 
> HashCode is a very important property of an object. It just shouldn't be used 
> for hash addressing, like ArayList
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KIP-1049: Add config log.summary.interval.ms to Kafka Streams [kafka]

2024-06-23 Thread via GitHub


dujian0068 opened a new pull request, #16428:
URL: https://github.com/apache/kafka/pull/16428

   [KAFKA-16584 Make log processing summary configurable or 
debug](https://issues.apache.org/jira/browse/KAFKA-16584)
   [KIP-1049: Add config log.summary.interval.ms to Kafka 
Streams](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]

2024-06-23 Thread via GitHub


OmniaGM commented on PR #15999:
URL: https://github.com/apache/kafka/pull/15999#issuecomment-2184947365

   @C0urante did you get a chance to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]

2024-06-23 Thread via GitHub


dujian0068 commented on code in PR #16413:
URL: https://github.com/apache/kafka/pull/16413#discussion_r1650044412


##
raft/src/main/java/org/apache/kafka/raft/RequestManager.java:
##
@@ -372,7 +372,7 @@ void onRequestSent(long correlationId, long timeMs) {
 @Override
 public String toString() {
 return String.format(
-"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%d)",
+"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, 
lastFailTimeMs=%d, inFlightCorrelationId=%s)",

Review Comment:
   Of course, it has been modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]

2024-06-23 Thread via GitHub


TaiJuWu commented on code in PR #16425:
URL: https://github.com/apache/kafka/pull/16425#discussion_r1650015837


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole 
blackhole) {
 }
 }
 }
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testPCollectionsImmutableMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i 
+ mapSize, 0);

Review Comment:
   This method is **not** thread-safe, we can check the result by add 
`assertEquals(1, pcollectionsImmutableMap.size());` to verify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]

2024-06-23 Thread via GitHub


TaiJuWu commented on code in PR #16425:
URL: https://github.com/apache/kafka/pull/16425#discussion_r1650015837


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java:
##
@@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole 
blackhole) {
 }
 }
 }
+
+@Benchmark
+@OperationsPerInvocation(TIMES)
+public void testPCollectionsImmutableMapGet(Blackhole blackhole) {
+for (int i = 0; i < TIMES; i++) {
+if (i % writePerLoops == 0) {
+// add offset mapSize to ensure computeIfAbsent do add new 
entry
+pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i 
+ mapSize, 0);

Review Comment:
   This method is not thread-safe, we can check the result by add 
`assertEquals(1, pcollectionsImmutableMap.size());` to verify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17009) Add unit test to query nonexistent replica by describeReplicaLogDirs

2024-06-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17009.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Add unit test to query nonexistent replica by describeReplicaLogDirs
> 
>
> Key: KAFKA-17009
> URL: https://issues.apache.org/jira/browse/KAFKA-17009
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
> Fix For: 3.9.0
>
>
> our docs[0] says that "currentReplicaLogDir will be null if the replica is 
> not found", so it means `describeReplicaLogDirs` can accept the queries for 
> nonexistent replicas. However, current UT [1] only verify the replica is not 
> found due to storage error. We should add a UT to verify it for nonexistent 
> replica
> [0] 
> https://github.com/apache/kafka/blob/391778b8d737f4af074422ffe61bc494b21e6555/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java#L71
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L2356



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17009: Add unit test to query nonexistent replica by describeReplicaLogDirs [kafka]

2024-06-23 Thread via GitHub


chia7712 merged PR #16423:
URL: https://github.com/apache/kafka/pull/16423


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]

2024-06-23 Thread via GitHub


bboyleonp666 opened a new pull request, #16427:
URL: https://github.com/apache/kafka/pull/16427

   `TestUtils.verifyNoUnexpectedThreads()` will verify there's no remaining 
threads that might affect the consequent test cases, which should be checked 
before and after all test cases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on PR #16396:
URL: https://github.com/apache/kafka/pull/16396#issuecomment-2184820719

   Hi @chia7712 , I have make some changes based on comments, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-23 Thread via GitHub


ashoke-cube commented on PR #16095:
URL: https://github.com/apache/kafka/pull/16095#issuecomment-2184736478

   > Hi @ashoke-cube could you fix the build? Thanks!
   
   Hey @gharris1727 I looked into the build failure. It is a bit weird. It is 
failing because it is not able to find the junit's `Test` class. I am using the 
same annotation as every other test in that class. Local test runs fine. It's 
not clear to me what the issue is here.
   ```
   Task :connect:runtime:compileTestJava
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:140:
 error: cannot find symbol
   
   @Test
   ^
   symbol:   class Test
   location: class WorkerConnectorTest
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:393:
 error: cannot find symbol
   
   @Test
   ^
   symbol:   class Test
   location: class WorkerConnectorTest
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on code in PR #16396:
URL: https://github.com/apache/kafka/pull/16396#discussion_r1649964164


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java:
##
@@ -161,14 +163,14 @@ private void overwriteInternalTopicManagerWithMock() {
 partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 }
 
-@Before
 public void setUp() {
-adminClient = 
createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS);
+adminClient = 
createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS, true);
 }
 
 
 @Test
 public void 
shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled()
 {
+setUp();

Review Comment:
   Indeed, this is the only one test case which call `setup` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-06-23 Thread via GitHub


pasharik commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1649455825


##
core/src/test/scala/unit/kafka/admin/AclCommandTest.scala:
##
@@ -324,12 +348,18 @@ class AclCommandTest extends QuorumTestHarness with 
Logging {
   }
 
   private def withAuthorizer()(f: Authorizer => Unit): Unit = {
-val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
-val authZ = new AclAuthorizer
-try {
-  authZ.configure(kafkaConfig.originals)
-  f(authZ)
-} finally authZ.close()
+if (isKRaftTest()) {
+  (servers.map(_.authorizer.get) ++ 
controllerServers.map(_.authorizer.get)).foreach { auth =>

Review Comment:
   > That will create zk authorizer even though the tests don't have zk, right?
   
   Yeah, you are right...
   
   Restored the original implementation :ok_hand: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-23 Thread via GitHub


loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1649445442


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java:
##
@@ -49,6 +51,21 @@ public Headers headers() {
 return value.headers();
 }
 
+public ConsumerRecord rawRecord() {
+return rawRecord;
+}
+
+@Override
+public boolean equals(final Object other) {
+// Do not include rawRecord in the comparison
+return super.equals(other);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}
+

Review Comment:
   @cadonna Correct, it is the exact same behaviour. 
   
   I had a SpotBugs warning on this one to explicitly override and call super 
methods (e.g., like here: 
https://github.com/apache/kafka/blob/trunk/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java#L201).
 
   
   Override can be removed and warning can be ignored if it is more convenient.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-23 Thread via GitHub


brenden20 commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649440775


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -106,72 +102,99 @@ public class HeartbeatRequestManagerTest {
 private MembershipManager membershipManager;
 private HeartbeatRequestManager.HeartbeatRequestState 
heartbeatRequestState;
 private HeartbeatRequestManager.HeartbeatState heartbeatState;
-private final String memberId = "member-id";
-private final int memberEpoch = 1;
 private BackgroundEventHandler backgroundEventHandler;
-private Metrics metrics;
+private LogContext logContext;
 
 @BeforeEach
 public void setUp() {
-setUp(ConsumerTestBuilder.createDefaultGroupInformation());
-}
-
-private void setUp(Optional 
groupInfo) {
-testBuilder = new ConsumerTestBuilder(groupInfo, true, false);
-time = testBuilder.time;
-coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestManager = 
testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
-heartbeatRequestState = 
testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
-heartbeatState = 
testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
-backgroundEventHandler = testBuilder.backgroundEventHandler;
-subscriptions = testBuilder.subscriptions;
-membershipManager = 
testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
-metadata = testBuilder.metadata;
-metrics = new Metrics(time);
+this.time = new MockTime();
+Metrics metrics = new Metrics(time);
+this.logContext = new LogContext();
+this.pollTimer = mock(Timer.class);
+this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+this.heartbeatState = mock(HeartbeatState.class);
+this.backgroundEventHandler = mock(BackgroundEventHandler.class);
+this.subscriptions = mock(SubscriptionState.class);
+this.membershipManager = mock(MembershipManagerImpl.class);
+this.metadata = mock(ConsumerMetadata.class);
+ConsumerConfig config = mock(ConsumerConfig.class);
+
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
+DEFAULT_HEARTBEAT_INTERVAL_MS,
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
+
+this.heartbeatRequestManager = new HeartbeatRequestManager(
+logContext,
+pollTimer,
+config,
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler,
+metrics);
 
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
+Map> map = new HashMap<>();
+LocalAssignment local = new LocalAssignment(0, map);
+when(membershipManager.currentAssignment()).thenReturn(local);
 }
 
-private void resetWithZeroHeartbeatInterval(Optional 
groupInstanceId) {
-cleanup();
-
-ConsumerTestBuilder.GroupInformation gi = new 
ConsumerTestBuilder.GroupInformation(
-DEFAULT_GROUP_ID,
-groupInstanceId,
+private void createHeartbeatStateWith0HeartbeatInterval() {
+this.heartbeatRequestState = spy(new HeartbeatRequestState(
+logContext,
+time,
 0,
-0.0,
-Optional.of(DEFAULT_REMOTE_ASSIGNOR)
-);
+DEFAULT_RETRY_BACKOFF_MS,
+DEFAULT_RETRY_BACKOFF_MAX_MS,
+DEFAULT_HEARTBEAT_JITTER_MS));
 
-setUp(Optional.of(gi));
+heartbeatRequestManager = createHeartbeatRequestManager(
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 }
 
-@AfterEach
-public void cleanup() {
-if (testBuilder != null) {
-testBuilder.close();
-}
+private void resetWithZeroHeartbeatInterval() {

Review Comment:
   It works for all but one test, for now I am keeping 
```createHeartbeatStateWith0HeartbeatInterval()``` for 
```testSkippingHeartbeat(final boolean shouldSkipHeartbeat)``` since this test 
requires the initial heartbeatInterval be 0



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -275,29 +316,34 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
 

Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]

2024-06-23 Thread via GitHub


lianetm commented on code in PR #16312:
URL: https://github.com/apache/kafka/pull/16312#discussion_r1649439786


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -747,7 +741,7 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 membershipManager.poll(time.milliseconds());
 
 assertEquals(Collections.emptySet(), 
membershipManager.topicsAwaitingReconciliation());
-
verify(subscriptionState).assignFromSubscribed(topicPartitions(topic2Assignment,
 topic2Metadata));
+
verify(subscriptionState).assignFromSubscribedAwaitingCallback(eq(topicPartitions(topic2Assignment,
 topic2Metadata)), eq(topicPartitions(topic2Assignment, topic2Metadata)));

Review Comment:
   is there a reason to use the `eq` here, that we're providing the to specific 
arguments? 



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -2489,7 +2490,9 @@ private void 
testRevocationCompleted(MembershipManagerImpl membershipManager,
 verify(subscriptionState).markPendingRevocation(anySet());
 List expectedTopicPartitionAssignment =
 buildTopicPartitions(expectedCurrentAssignment);
-verify(subscriptionState).assignFromSubscribed(new 
HashSet<>(expectedTopicPartitionAssignment));
+HashSet expectedSet = new 
HashSet<>(expectedTopicPartitionAssignment);
+HashSet emptySet = new HashSet<>();

Review Comment:
   we could simplify removing this and just reference `Collections.emptySet()` 
on the 2nd param of the assignFromSubscribedAwaitingCallback



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -747,7 +741,7 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU
 membershipManager.poll(time.milliseconds());
 
 assertEquals(Collections.emptySet(), 
membershipManager.topicsAwaitingReconciliation());
-
verify(subscriptionState).assignFromSubscribed(topicPartitions(topic2Assignment,
 topic2Metadata));
+
verify(subscriptionState).assignFromSubscribedAwaitingCallback(eq(topicPartitions(topic2Assignment,
 topic2Metadata)), eq(topicPartitions(topic2Assignment, topic2Metadata)));

Review Comment:
   if we don't need it let's remove it in other places when possible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-23 Thread via GitHub


lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
-heartbeatRequestManager = new HeartbeatRequestManager(new 
LogContext(), pollTimer, config(),
-coordinatorRequestManager, membershipManager, heartbeatState, 
heartbeatRequestState,
-backgroundEventHandler, metrics);
 when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
 
 int exceededTimeMs = 5;
 time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
 
+when(membershipManager.isLeavingGroup()).thenReturn(false);
+when(pollTimer.isExpired()).thenReturn(true);
 NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
 assertEquals(1, pollResult.unsentRequests.size());
 verify(membershipManager).transitionToSendingLeaveGroup(true);
 verify(pollTimer, never()).isExpiredBy();
-assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
 
 clearInvocations(pollTimer);
 heartbeatRequestManager.resetPollTimer(time.milliseconds());
 verify(pollTimer).isExpiredBy();
 }
 
 @Test
-public void testHeartbeatMetrics() {
-// setup
-coordinatorRequestManager = mock(CoordinatorRequestManager.class);
-membershipManager = mock(MembershipManager.class);
-heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
-time = new MockTime();
-metrics = new Metrics(time);
-heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
-new LogContext(),
-time,
-0, // This initial interval should be 0 to ensure heartbeat on the 
clock
-DEFAULT_RETRY_BACKOFF_MS,
-DEFAULT_RETRY_BACKOFF_MAX_MS,
-0);
-backgroundEventHandler = mock(BackgroundEventHandler.class);
+public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
 heartbeatRequestManager = createHeartbeatRequestManager(
-coordinatorRequestManager,
-membershipManager,
-heartbeatState,
-heartbeatRequestState,
-backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", )));
-when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
-assertNotNull(getMetric("heartbeat-response-time-max"));
-assertNotNull(getMetric("heartbeat-rate"));
-assertNotNull(getMetric("heartbeat-total"));
-assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
-// test poll
-assertHeartbeat(heartbeatRequestManager, 0);
-time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(1.0, getMetric("heartbeat-total").metricValue());
-assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), 
getMetric("last-heartbeat-seconds-ago").metricValue());
-
-assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
-assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
-assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
-// Randomly sleep for some time
-Random rand = new Random();
-int randomSleepS = rand.nextInt(11);
-time.sleep(randomSleepS * 1000);
-assertEquals((double) randomSleepS, 
getMetric("last-heartbeat-seconds-ago").metricValue());
-}
+coordinatorRequestManager,
+membershipManager,
+heartbeatState,
+heartbeatRequestState,
+backgroundEventHandler);
 
-@Test
-public void 
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+when(membershipManager.state()).thenReturn(MemberState.STABLE);
 mockStableMember();
 
 time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+when(membershipManager.isLeavingGroup()).thenReturn(true);

Review Comment:
   uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock 
now, and the HB mgr does not check the isLeavingGroup to generate a HB)



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##
@@ -659,78 +753,38 @@ public void 
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
 
 @Test
 public void testisExpiredByUsedForLogging() {
-

Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]

2024-06-23 Thread via GitHub


lianetm commented on PR #16200:
URL: https://github.com/apache/kafka/pull/16200#issuecomment-2183475535

   Hey @brenden20, I completed another pass, left some comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17011: SupportedFeatures.MinVersion incorrectly blocks v0 (3.8) [kafka]

2024-06-23 Thread via GitHub


jolshan commented on PR #16420:
URL: https://github.com/apache/kafka/pull/16420#issuecomment-2183475015

   Ran the kraft upgrade tests and they passed with this change  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]

2024-06-23 Thread via GitHub


brenden20 commented on PR #16312:
URL: https://github.com/apache/kafka/pull/16312#issuecomment-2183474544

   Thank you for the feedback @lianetm, I just made all the suggested 
improvements!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]

2024-06-23 Thread via GitHub


jolshan commented on PR #16409:
URL: https://github.com/apache/kafka/pull/16409#issuecomment-2182994898

   I will merge for now to unblock fixing the other issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused method 'createDirectoriesFrom' in DirectoryId [kafka]

2024-06-23 Thread via GitHub


wernerdv commented on PR #16419:
URL: https://github.com/apache/kafka/pull/16419#issuecomment-2182969536

   @soarez Hello, please, take a look small cleanup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove unused method 'createDirectoriesFrom' in DirectoryId [kafka]

2024-06-23 Thread via GitHub


wernerdv opened a new pull request, #16419:
URL: https://github.com/apache/kafka/pull/16419

   Remove unused method 'createDirectoriesFrom' in DirectoryId.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]

2024-06-23 Thread via GitHub


frankvicky commented on PR #16396:
URL: https://github.com/apache/kafka/pull/16396#issuecomment-2182964936

   Hi @chia7712, I have update the PR, PTAL  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17024) add integration test for TransactionsCommand

2024-06-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17024:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> add integration test for TransactionsCommand
> 
>
> Key: KAFKA-17024
> URL: https://issues.apache.org/jira/browse/KAFKA-17024
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> as title. currently we have only UT for TransactionsCommand



--
This message was sent by Atlassian Jira
(v8.20.10#820010)