Re: [PR] KAFKA-16967: NioEchoServer fails to register connection and causes flaky failure. [kafka]
chia7712 commented on PR #16384: URL: https://github.com/apache/kafka/pull/16384#issuecomment-2185672343 @frankvicky could you please take a look at filed test? that may be related to this PR? ``` org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Metric not updated failed-authentication-total expected:<1.0> but was:<24.0> ==> expected: but was: 栈跟踪 org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Metric not updated failed-authentication-total expected:<1.0> but was:<24.0> ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at app//org.apache.kafka.common.network.NioEchoServer.waitForMetrics(NioEchoServer.java:213) at app//org.apache.kafka.common.network.NioEchoServer.verifyAuthenticationMetrics(NioEchoServer.java:172) at app//org.apache.kafka.common.network.SslTransportLayerTest.testUnsupportedCiphers(SslTransportLayerTest.java:650) at java.base@21.0.3/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base@21.0.3/java.lang.reflect.Method.invoke(Method.java:580) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool
[ https://issues.apache.org/jira/browse/KAFKA-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859568#comment-17859568 ] Abhijeet Kumar commented on KAFKA-16853: Hi [~showuon] . I plan to start this week. > Split RemoteLogManagerScheduledThreadPool > - > > Key: KAFKA-16853 > URL: https://issues.apache.org/jira/browse/KAFKA-16853 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Abhijeet Kumar >Priority: Major > > *Summary* > To begin with create just the RemoteDataExpirationThreadPool and move > expiration to it. Keep all settings as if the only thread pool was the > RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired > correctly to the RemoteLogManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Implemented release acquired records functionality to SharePartition [kafka]
adixitconfluent opened a new pull request, #16430: URL: https://github.com/apache/kafka/pull/16430 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Remote fetch throttle metrics [kafka]
abhijeetk88 commented on PR #16087: URL: https://github.com/apache/kafka/pull/16087#issuecomment-2185630174 > @abhijeetk88 > > Thanks for the patch! Verified that the metric is emitted via Jconsole. The metric objectName is set to `kafka.server:type=RemoteLogManager`, should we rename it to `kafka.log.remote:type=RemoteLogManager` instead? > > Also, can you add description to each of the metric? > > https://private-user-images.githubusercontent.com/11411249/339067376-7131d82e-0735-44b4-8120-2b8f7956f682.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTkyMDY0MzksIm5iZiI6MTcxOTIwNjEzOSwicGF0aCI6Ii8xMTQxMTI0OS8zMzkwNjczNzYtNzEzMWQ4MmUtMDczNS00NGI0LTgxMjAtMmI4Zjc5NTZmNjgyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA2MjQlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNjI0VDA1MTUzOVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTQ1YmNjNDU5OTI2ZDA5MWQzNmFlMGUzOTIzZjJmOGI0MGE3ZjI2NGQwNTI3ZTU2N2FiMTRlMmI1N2QxMDc0NDMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.TnaOqEOfEmB3cI5IRQ7AOD8Ge3Orjnf-otD26dWfuzk;> I am facing a problem here. When creating a SensorAccess, it needs _org.apache.kafka.common.metrics.Metrics_, but with this the prefix becomes kafka.server and cannot be changed. We instead need to use _KafkaMetricsGroup_ but there is no way to create SensorAccess with it. @kamalcph @showuon are you aware of a workaround? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]
chia7712 commented on code in PR #16425: URL: https://github.com/apache/kafka/pull/16425#discussion_r1650346962 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java: ## @@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole blackhole) { } } } + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testPCollectionsImmutableMapGet(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i + mapSize, 0); Review Comment: so could you please add single-write/multi-read scenario for those maps? That is a existent use case in kafka -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
chia7712 merged PR #16396: URL: https://github.com/apache/kafka/pull/16396 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16998) Fix warnings in our Github actions
[ https://issues.apache.org/jira/browse/KAFKA-16998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16998. Fix Version/s: 3.9.0 Resolution: Fixed > Fix warnings in our Github actions > -- > > Key: KAFKA-16998 > URL: https://issues.apache.org/jira/browse/KAFKA-16998 > Project: Kafka > Issue Type: Task > Components: build >Reporter: Mickael Maison >Assignee: Kuan Po Tseng >Priority: Major > Fix For: 3.9.0 > > > Most of our Github actions produce warnings, see > [https://github.com/apache/kafka/actions/runs/9572915509|https://github.com/apache/kafka/actions/runs/9572915509.] > for example. > It looks like we need to bump the version we use for actions/checkout, > actions/setup-python, actions/upload-artifact to v4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16998 Fix warnings in our Github actions [kafka]
chia7712 merged PR #16410: URL: https://github.com/apache/kafka/pull/16410 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16998 Fix warnings in our Github actions [kafka]
VedarthConfluent commented on PR #16410: URL: https://github.com/apache/kafka/pull/16410#issuecomment-2185600693 LGTM, thanks for the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated KAFKA-17025: -- Description: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing: {code:java} ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory . at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run at java.Lang.Thread.run {code} I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: {code:java} @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } {code} 2. Then KafkaThread catch uncaught exception and just log it: {code:java} public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); }{code} To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it? was: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing: {code:java} ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory . at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run {code} I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: {code:java} @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there
[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated KAFKA-17025: -- Description: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing: {code:java} ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory . at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run {code} I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: {code:java} @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } {code} 2. Then KafkaThread catch uncaught exception and just log it: {code:java} public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); }{code} To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it? was: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing: ``` ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory . at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run ``` I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: ``` @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) \{ log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for
[jira] [Updated] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated KAFKA-17025: -- Description: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do nothing: ``` ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory . at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run at java. Lang.Thread.run ``` I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: ``` @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) \{ log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try \{ runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } ``` 2. Then KafkaThread catch uncaught exception and just log it: ```java public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); } ``` To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it? was: When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do noning: ```java ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory ``` I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: ``` @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown");
[jira] [Created] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
Hongshun Wang created KAFKA-17025: - Summary: KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler Key: KAFKA-17025 URL: https://issues.apache.org/jira/browse/KAFKA-17025 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 3.6.2 Reporter: Hongshun Wang Fix For: 3.6.3 When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do noning: ```java ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: Direct buffer memory ``` I try to find what happens: 1. It seems that OutOfMemoryError as a Error is not captured when org.apache.kafka.clients.producer.internals.Sender#run try to catch a Exception: ``` @Override public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } ``` 2. Then KafkaThread catch uncaught exception and just log it: ```java public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); configureThread(name, daemon); } private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in thread '{}':", name, e)); } ``` To be honest, I don't understand why KafkaThread doing nothing but log it when an uncaught exception occurs? Why not exposing method to set setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can determine what to do with uncaught exception, no matter thrown it or just ignore it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: New year code clean up - misc [kafka]
github-actions[bot] commented on PR #15071: URL: https://github.com/apache/kafka/pull/15071#issuecomment-2185520326 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Change config. compressionType to be an enum type instead [kafka]
github-actions[bot] commented on PR #15403: URL: https://github.com/apache/kafka/pull/15403#issuecomment-2185520123 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Remote copy throttle metrics [kafka]
showuon commented on code in PR #16086: URL: https://github.com/apache/kafka/pull/16086#discussion_r1650291862 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -789,7 +803,10 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException copyQuotaManagerLock.lock(); try { -while (rlmCopyQuotaManager.isQuotaExceeded()) { +while (true) { +long throttleTimeMs = rlmCopyQuotaManager.checkQuotaAndGetThrottleTimeMs(); +if (throttleTimeMs <= 0) break; + copyThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); Review Comment: @kamalcph , thanks for the explanation! Good to know we've handled it well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]
dujian0068 commented on code in PR #16413: URL: https://github.com/apache/kafka/pull/16413#discussion_r1650266581 ## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ## @@ -372,12 +372,12 @@ void onRequestSent(long correlationId, long timeMs) { @Override public String toString() { return String.format( -"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%d)", +"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%s)", node, state, lastSendTimeMs, lastFailTimeMs, -inFlightCorrelationId +inFlightCorrelationId.isPresent() ? inFlightCorrelationId.getAsLong() : null Review Comment: You are right, this makes it easier to understand it has been modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650219136 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -1616,4 +1623,25 @@ public KStream processValues( processNode, builder); } + +@Override +public KStream markAsPartitioned() { +final ProcessorParameters processorParameters = +new ProcessorParameters<>(new PassThrough<>(), PARTITION_PRESERVE_NAME + name); Review Comment: I will keep this open to remind myself to update the KIP after the review is complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650218909 ## streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java: ## @@ -685,6 +685,41 @@ KStream flatMapValues(final ValueMapper KStream flatMapValues(final ValueMapperWithKey> mapper, final Named named); +/** + * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended, + * and does not require further repartitioning by downstream key changing operations. + * + * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}. + * For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. + * However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins + * assumes and uses the composite key instead of the original key. Review Comment: @mjsax Sorry for the confusion, the javadoc here can be better written...Before I do that, the composite key notion came from the original discussion [here](https://lists.apache.org/thread/r7yqsoqsox0z2mzxt33r9r99tnwvb58o) - nonetheless I should remove it. Perhaps a brief description that `interactive query might fail when trying to guess the original key`. As for Joins, I might need your help. Most of my understanding of the problem came from the discussion thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650214064 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named)); -selectKeyProcessorNode.keyChangingOperation(true); +selectKeyProcessorNode.keyChangingOperation(repartitionRequired); builder.addGraphNode(graphNode, selectKeyProcessorNode); // key serde cannot be preserved return new KStreamImpl<>( -selectKeyProcessorNode.nodeName(), -null, -valueSerde, -subTopologySourceNodes, -true, -selectKeyProcessorNode, -builder); +selectKeyProcessorNode.nodeName(), Review Comment: Thanks, reverted these indentations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16811 Sliding window approach to calculate non-zero punctuate-ratio metric [kafka]
ganesh-sadanala commented on PR #16162: URL: https://github.com/apache/kafka/pull/16162#issuecomment-2185276488 @mjsax @cadonna Thanks for the insights. I will make the necessary changes and would love to write KIP for this change. I recently requested the confluence account for another KIP, and it got approved. This is one of my first two KIPs. If anything you want to share that would help me, please do it. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16959: ConfigCommand should allow to define both `entity-default` and `entity-name` [kafka]
m1a2st commented on PR #16381: URL: https://github.com/apache/kafka/pull/16381#issuecomment-2185156040 Gentle ping, @chia7712, I update the title -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-759 Mark as Partitioned [kafka]
LQXshane commented on code in PR #15740: URL: https://github.com/apache/kafka/pull/15740#discussion_r1650128875 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ## @@ -222,21 +226,21 @@ public KStream selectKey(final KeyValueMapper
Re: [PR] KAFKA-16755: Implement lock timeout functionality in SharePartition [kafka]
adixitconfluent commented on PR #16414: URL: https://github.com/apache/kafka/pull/16414#issuecomment-2185094033 > Thanks for the PR. Looks good overall, with just a request to comment at least one of the unit tests a bit more comprehensively to make it easier to see what is intended. If the behaviour ever changes subtly, we might have to revisit them and making them a little easier to read would be helpful. Thanks for the review @AndrewJSchofield , I have added a few comments to most of these tests to explain the intention. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16650 add integration test for Admin#abortTransaction [kafka]
brandboat opened a new pull request, #16429: URL: https://github.com/apache/kafka/pull/16429 related to https://issues.apache.org/jira/browse/KAFKA-16650 as title, add integration test for Admin#abortTransaction ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
chia7712 commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1650089242 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +170,25 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public boolean validate() { +Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); +Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + +if (!emitCheckpointsValue && !syncGroupOffsetsValue) { +LOG.warn("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + +EMIT_CHECKPOINTS_ENABLED + " set to false"); +return false; +} + +boolean requireOffsetSyncs = emitCheckpointsValue || syncGroupOffsetsValue; Review Comment: As `!emitCheckpointsValue && !syncGroupOffsetsValue` return before, `requireOffsetSyncs` is always true, right? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +170,25 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public boolean validate() { +Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); +Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + +if (!emitCheckpointsValue && !syncGroupOffsetsValue) { +LOG.warn("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + +EMIT_CHECKPOINTS_ENABLED + " set to false"); +return false; +} + +boolean requireOffsetSyncs = emitCheckpointsValue || syncGroupOffsetsValue; +if (!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")) & requireOffsetSyncs) { Review Comment: Maybe we should use `EMIT_OFFSET_SYNCS_ENABLED_DEFAULT` here? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -366,6 +291,19 @@ boolean update(long upstreamOffset, long downstreamOffset) { return shouldSyncOffsets; } +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (!(o instanceof PartitionState)) return false; +PartitionState that = (PartitionState) o; +return previousUpstreamOffset == that.previousUpstreamOffset && previousDownstreamOffset == that.previousDownstreamOffset && lastSyncDownstreamOffset == that.lastSyncDownstreamOffset && maxOffsetLag == that.maxOffsetLag && shouldSyncOffsets == that.shouldSyncOffsets; Review Comment: Could you please split it to multi lines? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ## @@ -166,6 +170,25 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } +public boolean validate() { +Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); +Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + +if (!emitCheckpointsValue && !syncGroupOffsetsValue) { +LOG.warn("MirrorCheckpointConnector can't run without both " + SYNC_GROUP_OFFSETS_ENABLED + ", " + +EMIT_CHECKPOINTS_ENABLED + " set to false"); +return false; +} + +boolean requireOffsetSyncs = emitCheckpointsValue || syncGroupOffsetsValue; +if (!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")) & requireOffsetSyncs) { +LOG.warn("MirrorCheckpointConnector can't run with " + EMIT_OFFSET_SYNCS_ENABLED + " set to false while, " + +EMIT_CHECKPOINTS_ENABLED + " and/o r" + SYNC_GROUP_OFFSETS_ENABLED + " set to true"); Review Comment: ` and/o r` or ` and/or`? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ## @@ -234,6 +234,30 @@ public ConfigDef config() { @Override public org.apache.kafka.common.config.Config validate(Map props) { List configValues = super.validate(props).configValues(); +validateExactlyOnceConfigs(props, configValues); +validateEmitOffsetSyncConfigs(props, configValues); + +return new org.apache.kafka.common.config.Config(configValues); +} + +private static void validateEmitOffsetSyncConfigs(Map props, List configValues) { +boolean offsetSyncsConfigured = configValues.stream() +.anyMatch(conf -> conf.name().startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) ||
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on PR #16396: URL: https://github.com/apache/kafka/pull/16396#issuecomment-2185016432 Hi @chia7712, I have do some cleanup, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on code in PR #16396: URL: https://github.com/apache/kafka/pull/16396#discussion_r1650089520 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -380,17 +372,31 @@ public void setUp() { } } -@Parameterized.Parameters(name = "rackAwareStrategy={0}") -public static Collection getParamStoreType() { -return asList(new Object[][] { -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE}, -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC}, -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY}, -}); +//@Parameterized.Parameters(name = "rackAwareStrategy={0}") Review Comment: Oops, I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on code in PR #16396: URL: https://github.com/apache/kafka/pull/16396#discussion_r1650089120 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java: ## @@ -71,23 +73,27 @@ public class KeyValueStoreMaterializerTest { @Mock private StreamsConfig streamsConfig; -@Before Review Comment: Yes, since we don't have any parameters in `setup` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]
chia7712 commented on code in PR #16019: URL: https://github.com/apache/kafka/pull/16019#discussion_r1650085659 ## tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue; +import org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Optional; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class TransactionLogMessageFormatter implements MessageFormatter { + +@Override +public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +Optional.ofNullable(consumerRecord.key()) +.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key))) +.ifPresent(transactionLogKey -> { +short version = ByteBuffer.wrap(consumerRecord.key()).getShort(); +ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +json.set("version", new TextNode(Short.toString(version))); + +if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION +&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) { +byte[] value = consumerRecord.value(); Review Comment: Please handle the null and unmatched version. key and value has their version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]
chia7712 commented on code in PR #16413: URL: https://github.com/apache/kafka/pull/16413#discussion_r1650083987 ## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ## @@ -372,12 +372,12 @@ void onRequestSent(long correlationId, long timeMs) { @Override public String toString() { return String.format( -"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%d)", +"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%s)", node, state, lastSendTimeMs, lastFailTimeMs, -inFlightCorrelationId +inFlightCorrelationId.isPresent() ? inFlightCorrelationId.getAsLong() : null Review Comment: how about using `undefined` to replace "null"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
chia7712 commented on code in PR #16396: URL: https://github.com/apache/kafka/pull/16396#discussion_r1650082583 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -180,9 +169,10 @@ private void setupStore() { when(store.name()).thenReturn(storeName); } -@Test -public void shouldNotRegisterSameStoreMultipleTimes() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -199,9 +189,10 @@ public void shouldNotRegisterStoreWithoutMetadata() { () -> changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager)); } -@Test -public void shouldSupportUnregisterChangelogBeforeInitialization() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -280,9 +272,10 @@ public void shouldSupportUnregisterChangelogBeforeCompletion() { assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH)); } -@Test -public void shouldSupportUnregisterChangelogAfterCompletion() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -549,9 +548,10 @@ public void shouldRestoreFromPositionAndCheckForCompletion() { } } -@Test -public void shouldRestoreFromBeginningAndCheckCompletion() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java: ## @@ -380,17 +372,31 @@ public void setUp() { } } -@Parameterized.Parameters(name = "rackAwareStrategy={0}") -public static Collection getParamStoreType() { -return asList(new Object[][] { -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE}, -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC}, -{StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY}, -}); +//@Parameterized.Parameters(name = "rackAwareStrategy={0}") Review Comment: please remove those code ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -338,9 +331,10 @@ public void shouldSupportUnregisterChangelogAfterCompletion() { } } -@Test -public void shouldInitializeChangelogAndCheckForCompletion() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java: ## @@ -71,23 +73,27 @@ public class KeyValueStoreMaterializerTest { @Mock private StreamsConfig streamsConfig; -@Before Review Comment: So we can keep using `@BeforeEach` now, right? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -228,9 +219,10 @@ public void shouldSupportUnregisterChangelogBeforeInitialization() { assertNull(standbyListener.capturedStore(UPDATE_BATCH)); } -@Test -public void shouldSupportUnregisterChangelogBeforeCompletion() { -setupStateManagerMock(); +@ParameterizedTest +@EnumSource(value = Task.TaskType.class, mode = EnumSource.Mode.INCLUDE, names = {"ACTIVE", "STANDBY"}) Review Comment: `mode = EnumSource.Mode.INCLUDE` is redundant, since that is the default value ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java: ## @@ -380,11 +374,12 @@ public void
[jira] [Commented] (KAFKA-17015) ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be deprecated and throw an exception
[ https://issues.apache.org/jira/browse/KAFKA-17015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859473#comment-17859473 ] dujian0068 commented on KAFKA-17015: Thanks you replay When I studied the Kafka source code, I find class `ContextualRecord` have an attribute is `ProcessorRecordContext` object and `ProcessorRecordContext#toString()` be deprecated. I am considering whether to also deprecate `ContextualRecord#toString()`. But it seems unnecessary, so I raised this issue > ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be > deprecated and throw an exception > - > > Key: KAFKA-17015 > URL: https://issues.apache.org/jira/browse/KAFKA-17015 > Project: Kafka > Issue Type: Improvement >Reporter: dujian0068 >Assignee: dujian0068 >Priority: Minor > > when review PR#16970。 I find function > `ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() ` be > deprecated because they have a mutable attribute, which will cause the > hashCode to change。 > I don't think that hashCode should be discarded just because it is mutable. > HashCode is a very important property of an object. It just shouldn't be used > for hash addressing, like ArayList > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KIP-1049: Add config log.summary.interval.ms to Kafka Streams [kafka]
dujian0068 opened a new pull request, #16428: URL: https://github.com/apache/kafka/pull/16428 [KAFKA-16584 Make log processing summary configurable or debug](https://issues.apache.org/jira/browse/KAFKA-16584) [KIP-1049: Add config log.summary.interval.ms to Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16254: Allow MM2 to fully disable offset sync feature [kafka]
OmniaGM commented on PR #15999: URL: https://github.com/apache/kafka/pull/15999#issuecomment-2184947365 @C0urante did you get a chance to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17013 RequestManager#ConnectionState#toString() should use %s [kafka]
dujian0068 commented on code in PR #16413: URL: https://github.com/apache/kafka/pull/16413#discussion_r1650044412 ## raft/src/main/java/org/apache/kafka/raft/RequestManager.java: ## @@ -372,7 +372,7 @@ void onRequestSent(long correlationId, long timeMs) { @Override public String toString() { return String.format( -"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%d)", +"ConnectionState(node=%s, state=%s, lastSendTimeMs=%d, lastFailTimeMs=%d, inFlightCorrelationId=%s)", Review Comment: Of course, it has been modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]
TaiJuWu commented on code in PR #16425: URL: https://github.com/apache/kafka/pull/16425#discussion_r1650015837 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java: ## @@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole blackhole) { } } } + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testPCollectionsImmutableMapGet(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i + mapSize, 0); Review Comment: This method is **not** thread-safe, we can check the result by add `assertEquals(1, pcollectionsImmutableMap.size());` to verify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17023: add PCollectionsImmutableMap to ConcurrentMapBenchmark [kafka]
TaiJuWu commented on code in PR #16425: URL: https://github.com/apache/kafka/pull/16425#discussion_r1650015837 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/ConcurrentMapBenchmark.java: ## @@ -189,4 +194,60 @@ public void testCopyOnWriteMapEntrySet(Blackhole blackhole) { } } } + +@Benchmark +@OperationsPerInvocation(TIMES) +public void testPCollectionsImmutableMapGet(Blackhole blackhole) { +for (int i = 0; i < TIMES; i++) { +if (i % writePerLoops == 0) { +// add offset mapSize to ensure computeIfAbsent do add new entry +pcollectionsImmutableMap = pcollectionsImmutableMap.updated(i + mapSize, 0); Review Comment: This method is not thread-safe, we can check the result by add `assertEquals(1, pcollectionsImmutableMap.size());` to verify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17009) Add unit test to query nonexistent replica by describeReplicaLogDirs
[ https://issues.apache.org/jira/browse/KAFKA-17009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17009. Fix Version/s: 3.9.0 Resolution: Fixed > Add unit test to query nonexistent replica by describeReplicaLogDirs > > > Key: KAFKA-17009 > URL: https://issues.apache.org/jira/browse/KAFKA-17009 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > Fix For: 3.9.0 > > > our docs[0] says that "currentReplicaLogDir will be null if the replica is > not found", so it means `describeReplicaLogDirs` can accept the queries for > nonexistent replicas. However, current UT [1] only verify the replica is not > found due to storage error. We should add a UT to verify it for nonexistent > replica > [0] > https://github.com/apache/kafka/blob/391778b8d737f4af074422ffe61bc494b21e6555/clients/src/main/java/org/apache/kafka/clients/admin/DescribeReplicaLogDirsResult.java#L71 > [1] > https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java#L2356 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17009: Add unit test to query nonexistent replica by describeReplicaLogDirs [kafka]
chia7712 merged PR #16423: URL: https://github.com/apache/kafka/pull/16423 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16791: Add thread detection to ClusterTestExtensions [kafka]
bboyleonp666 opened a new pull request, #16427: URL: https://github.com/apache/kafka/pull/16427 `TestUtils.verifyNoUnexpectedThreads()` will verify there's no remaining threads that might affect the consequent test cases, which should be checked before and after all test cases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on PR #16396: URL: https://github.com/apache/kafka/pull/16396#issuecomment-2184820719 Hi @chia7712 , I have make some changes based on comments, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]
ashoke-cube commented on PR #16095: URL: https://github.com/apache/kafka/pull/16095#issuecomment-2184736478 > Hi @ashoke-cube could you fix the build? Thanks! Hey @gharris1727 I looked into the build failure. It is a bit weird. It is failing because it is not able to find the junit's `Test` class. I am using the same annotation as every other test in that class. Local test runs fine. It's not clear to me what the issue is here. ``` Task :connect:runtime:compileTestJava /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:140: error: cannot find symbol @Test ^ symbol: class Test location: class WorkerConnectorTest /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16095/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:393: error: cannot find symbol @Test ^ symbol: class Test location: class WorkerConnectorTest ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on code in PR #16396: URL: https://github.com/apache/kafka/pull/16396#discussion_r1649964164 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java: ## @@ -161,14 +163,14 @@ private void overwriteInternalTopicManagerWithMock() { partitionAssignor.setInternalTopicManager(mockInternalTopicManager); } -@Before public void setUp() { -adminClient = createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS); +adminClient = createMockAdminClientForAssignor(EMPTY_CHANGELOG_END_OFFSETS, true); } @Test public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() { +setUp(); Review Comment: Indeed, this is the only one test case which call `setup` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
pasharik commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1649455825 ## core/src/test/scala/unit/kafka/admin/AclCommandTest.scala: ## @@ -324,12 +348,18 @@ class AclCommandTest extends QuorumTestHarness with Logging { } private def withAuthorizer()(f: Authorizer => Unit): Unit = { -val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false) -val authZ = new AclAuthorizer -try { - authZ.configure(kafkaConfig.originals) - f(authZ) -} finally authZ.close() +if (isKRaftTest()) { + (servers.map(_.authorizer.get) ++ controllerServers.map(_.authorizer.get)).foreach { auth => Review Comment: > That will create zk authorizer even though the tests don't have zk, right? Yeah, you are right... Restored the original implementation :ok_hand: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]
loicgreffier commented on code in PR #16093: URL: https://github.com/apache/kafka/pull/16093#discussion_r1649445442 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java: ## @@ -49,6 +51,21 @@ public Headers headers() { return value.headers(); } +public ConsumerRecord rawRecord() { +return rawRecord; +} + +@Override +public boolean equals(final Object other) { +// Do not include rawRecord in the comparison +return super.equals(other); +} + +@Override +public int hashCode() { +return super.hashCode(); +} + Review Comment: @cadonna Correct, it is the exact same behaviour. I had a SpotBugs warning on this one to explicitly override and call super methods (e.g., like here: https://github.com/apache/kafka/blob/trunk/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java#L201). Override can be removed and warning can be ignored if it is more convenient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
brenden20 commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1649440775 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -106,72 +102,99 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; -private final String memberId = "member-id"; -private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; -private Metrics metrics; +private LogContext logContext; @BeforeEach public void setUp() { -setUp(ConsumerTestBuilder.createDefaultGroupInformation()); -} - -private void setUp(Optional groupInfo) { -testBuilder = new ConsumerTestBuilder(groupInfo, true, false); -time = testBuilder.time; -coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); -heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); -heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); -backgroundEventHandler = testBuilder.backgroundEventHandler; -subscriptions = testBuilder.subscriptions; -membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); -metadata = testBuilder.metadata; -metrics = new Metrics(time); +this.time = new MockTime(); +Metrics metrics = new Metrics(time); +this.logContext = new LogContext(); +this.pollTimer = mock(Timer.class); +this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); +this.heartbeatState = mock(HeartbeatState.class); +this.backgroundEventHandler = mock(BackgroundEventHandler.class); +this.subscriptions = mock(SubscriptionState.class); +this.membershipManager = mock(MembershipManagerImpl.class); +this.metadata = mock(ConsumerMetadata.class); +ConsumerConfig config = mock(ConsumerConfig.class); + +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); + +this.heartbeatRequestManager = new HeartbeatRequestManager( +logContext, +pollTimer, +config, +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler, +metrics); when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); +Map> map = new HashMap<>(); +LocalAssignment local = new LocalAssignment(0, map); +when(membershipManager.currentAssignment()).thenReturn(local); } -private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { -cleanup(); - -ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( -DEFAULT_GROUP_ID, -groupInstanceId, +private void createHeartbeatStateWith0HeartbeatInterval() { +this.heartbeatRequestState = spy(new HeartbeatRequestState( +logContext, +time, 0, -0.0, -Optional.of(DEFAULT_REMOTE_ASSIGNOR) -); +DEFAULT_RETRY_BACKOFF_MS, +DEFAULT_RETRY_BACKOFF_MAX_MS, +DEFAULT_HEARTBEAT_JITTER_MS)); -setUp(Optional.of(gi)); +heartbeatRequestManager = createHeartbeatRequestManager( +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); } -@AfterEach -public void cleanup() { -if (testBuilder != null) { -testBuilder.close(); -} +private void resetWithZeroHeartbeatInterval() { Review Comment: It works for all but one test, for now I am keeping ```createHeartbeatStateWith0HeartbeatInterval()``` for ```testSkippingHeartbeat(final boolean shouldSkipHeartbeat)``` since this test requires the initial heartbeatInterval be 0 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -275,29 +316,34 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16312: URL: https://github.com/apache/kafka/pull/16312#discussion_r1649439786 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -747,7 +741,7 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU membershipManager.poll(time.milliseconds()); assertEquals(Collections.emptySet(), membershipManager.topicsAwaitingReconciliation()); - verify(subscriptionState).assignFromSubscribed(topicPartitions(topic2Assignment, topic2Metadata)); + verify(subscriptionState).assignFromSubscribedAwaitingCallback(eq(topicPartitions(topic2Assignment, topic2Metadata)), eq(topicPartitions(topic2Assignment, topic2Metadata))); Review Comment: is there a reason to use the `eq` here, that we're providing the to specific arguments? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -2489,7 +2490,9 @@ private void testRevocationCompleted(MembershipManagerImpl membershipManager, verify(subscriptionState).markPendingRevocation(anySet()); List expectedTopicPartitionAssignment = buildTopicPartitions(expectedCurrentAssignment); -verify(subscriptionState).assignFromSubscribed(new HashSet<>(expectedTopicPartitionAssignment)); +HashSet expectedSet = new HashSet<>(expectedTopicPartitionAssignment); +HashSet emptySet = new HashSet<>(); Review Comment: we could simplify removing this and just reference `Collections.emptySet()` on the 2nd param of the assignFromSubscribedAwaitingCallback ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -747,7 +741,7 @@ public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataU membershipManager.poll(time.milliseconds()); assertEquals(Collections.emptySet(), membershipManager.topicsAwaitingReconciliation()); - verify(subscriptionState).assignFromSubscribed(topicPartitions(topic2Assignment, topic2Metadata)); + verify(subscriptionState).assignFromSubscribedAwaitingCallback(eq(topicPartitions(topic2Assignment, topic2Metadata)), eq(topicPartitions(topic2Assignment, topic2Metadata))); Review Comment: if we don't need it let's remove it in other places when possible -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on code in PR #16200: URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); -heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), -coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, -backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); +when(membershipManager.isLeavingGroup()).thenReturn(false); +when(pollTimer.isExpired()).thenReturn(true); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); -assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } @Test -public void testHeartbeatMetrics() { -// setup -coordinatorRequestManager = mock(CoordinatorRequestManager.class); -membershipManager = mock(MembershipManager.class); -heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); -time = new MockTime(); -metrics = new Metrics(time); -heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( -new LogContext(), -time, -0, // This initial interval should be 0 to ensure heartbeat on the clock -DEFAULT_RETRY_BACKOFF_MS, -DEFAULT_RETRY_BACKOFF_MAX_MS, -0); -backgroundEventHandler = mock(BackgroundEventHandler.class); +public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { heartbeatRequestManager = createHeartbeatRequestManager( -coordinatorRequestManager, -membershipManager, -heartbeatState, -heartbeatRequestState, -backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", ))); -when(membershipManager.state()).thenReturn(MemberState.STABLE); - -assertNotNull(getMetric("heartbeat-response-time-max")); -assertNotNull(getMetric("heartbeat-rate")); -assertNotNull(getMetric("heartbeat-total")); -assertNotNull(getMetric("last-heartbeat-seconds-ago")); - -// test poll -assertHeartbeat(heartbeatRequestManager, 0); -time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(1.0, getMetric("heartbeat-total").metricValue()); -assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - -assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); -assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); -assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - -// Randomly sleep for some time -Random rand = new Random(); -int randomSleepS = rand.nextInt(11); -time.sleep(randomSleepS * 1000); -assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); -} +coordinatorRequestManager, +membershipManager, +heartbeatState, +heartbeatRequestState, +backgroundEventHandler); -@Test -public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { +when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); +when(membershipManager.state()).thenReturn(MemberState.STABLE); mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); +when(membershipManager.isLeavingGroup()).thenReturn(true); Review Comment: uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock now, and the HB mgr does not check the isLeavingGroup to generate a HB) ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ## @@ -659,78 +753,38 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { -
Re: [PR] KAFKA-15999 Migrate HeartbeatRequestManagerTest away from ConsumerTestBuilder [kafka]
lianetm commented on PR #16200: URL: https://github.com/apache/kafka/pull/16200#issuecomment-2183475535 Hey @brenden20, I completed another pass, left some comments. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17011: SupportedFeatures.MinVersion incorrectly blocks v0 (3.8) [kafka]
jolshan commented on PR #16420: URL: https://github.com/apache/kafka/pull/16420#issuecomment-2183475015 Ran the kraft upgrade tests and they passed with this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]
brenden20 commented on PR #16312: URL: https://github.com/apache/kafka/pull/16312#issuecomment-2183474544 Thank you for the feedback @lianetm, I just made all the suggested improvements! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Revert "KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config [kafka]
jolshan commented on PR #16409: URL: https://github.com/apache/kafka/pull/16409#issuecomment-2182994898 I will merge for now to unblock fixing the other issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused method 'createDirectoriesFrom' in DirectoryId [kafka]
wernerdv commented on PR #16419: URL: https://github.com/apache/kafka/pull/16419#issuecomment-2182969536 @soarez Hello, please, take a look small cleanup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove unused method 'createDirectoriesFrom' in DirectoryId [kafka]
wernerdv opened a new pull request, #16419: URL: https://github.com/apache/kafka/pull/16419 Remove unused method 'createDirectoriesFrom' in DirectoryId. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15623: Migrate streams tests (processor) module to JUnit 5 [kafka]
frankvicky commented on PR #16396: URL: https://github.com/apache/kafka/pull/16396#issuecomment-2182964936 Hi @chia7712, I have update the PR, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-17024) add integration test for TransactionsCommand
[ https://issues.apache.org/jira/browse/KAFKA-17024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17024: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > add integration test for TransactionsCommand > > > Key: KAFKA-17024 > URL: https://issues.apache.org/jira/browse/KAFKA-17024 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > as title. currently we have only UT for TransactionsCommand -- This message was sent by Atlassian Jira (v8.20.10#820010)