[jira] [Resolved] (KAFKA-17015) ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be deprecated and throw an exception

2024-06-24 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17015.

Resolution: Won't Fix

> ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be 
> deprecated and throw an exception
> -
>
> Key: KAFKA-17015
> URL: https://issues.apache.org/jira/browse/KAFKA-17015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
>
> when review PR#16970。 I find function 
> `ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() ` be 
> deprecated because they have a mutable attribute, which will cause the 
> hashCode to change。 
> I don't think that hashCode should be discarded just because it is mutable. 
> HashCode is a very important property of an object. It just shouldn't be used 
> for hash addressing, like ArayList
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-17015) ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be deprecated and throw an exception

2024-06-24 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reopened KAFKA-17015:


> ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be 
> deprecated and throw an exception
> -
>
> Key: KAFKA-17015
> URL: https://issues.apache.org/jira/browse/KAFKA-17015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
>
> when review PR#16970。 I find function 
> `ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() ` be 
> deprecated because they have a mutable attribute, which will cause the 
> hashCode to change。 
> I don't think that hashCode should be discarded just because it is mutable. 
> HashCode is a very important property of an object. It just shouldn't be used 
> for hash addressing, like ArayList
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17015) ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be deprecated and throw an exception

2024-06-24 Thread dujian0068 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dujian0068 resolved KAFKA-17015.

Resolution: Fixed

> ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() Should not be 
> deprecated and throw an exception
> -
>
> Key: KAFKA-17015
> URL: https://issues.apache.org/jira/browse/KAFKA-17015
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
>
> when review PR#16970。 I find function 
> `ContextualRecord#hashCode()、ProcessorRecordContext#hashCode() ` be 
> deprecated because they have a mutable attribute, which will cause the 
> hashCode to change。 
> I don't think that hashCode should be discarded just because it is mutable. 
> HashCode is a very important property of an object. It just shouldn't be used 
> for hash addressing, like ArayList
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-24 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859799#comment-17859799
 ] 

Hongshun Wang commented on KAFKA-17025:
---

[~gharris1727] Yes, I'd like to do it!

> KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
> --
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 3.6.3
>
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
> while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log it 
> when an uncaught exception occurs? Why not exposing method to set 
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
> determine what to do with uncaught exception, no matter thrown it or just 
> ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16928) Test all of the request and response methods in RaftUtil

2024-06-24 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859796#comment-17859796
 ] 

黃竣陽 commented on KAFKA-16928:
-

I'm interesting in this issue, Could you assign to me, thanks

> Test all of the request and response methods in RaftUtil
> 
>
> Key: KAFKA-16928
> URL: https://issues.apache.org/jira/browse/KAFKA-16928
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> Add a RaftUtilTest test suite that checks that the request and response 
> constructed by RaftUtil can be serialized to all of the support KRaft RPC 
> versions.
> The RPCs are:
>  # Fetch
>  # FetchSnapshot
>  # Vote
>  # BeginQuorumEpoch
>  # EndQuorumEpoch
> At the moment some of the RPCs are missing this should be worked after 
> RaftUtil implements the creation of all KRaft RPCs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16379) Coordinator flush time and event purgatory time metrics

2024-06-24 Thread Jeff Kim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Kim updated KAFKA-16379:
-
Summary: Coordinator flush time and event purgatory time metrics  (was: add 
metric to measure time spent in purgatory)

> Coordinator flush time and event purgatory time metrics
> ---
>
> Key: KAFKA-16379
> URL: https://issues.apache.org/jira/browse/KAFKA-16379
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jeff Kim
>Assignee: Jeff Kim
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17025) KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler

2024-06-24 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859793#comment-17859793
 ] 

Greg Harris commented on KAFKA-17025:
-

I don't think that it is appropriate for the Kafka Producer to expose an 
uncaught exception handler; the background thread should be an implementation 
detail and not part of the public API.

I do think that uncaught exceptions in the producer's background thread should 
propagate through the API somehow. For example, the death of the background 
thread could interrupt and/or throw an exception from in-progress and future 
calls to the producer. This would propagate the background error to whatever 
thread is calling the producer. It could behave as-if the producer was closed.

I found previous reports of this issue, and linked the oldest ticket that 
seemed relevant.

[~loserwang1024] Are you interested in working on this?

> KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler
> --
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 3.6.3
>
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
> while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log 

[jira] [Created] (KAFKA-17033) Consider replacing the directory id Optional with just Uuid

2024-06-24 Thread Jira
José Armando García Sancio created KAFKA-17033:
--

 Summary: Consider replacing the directory id Optional with 
just Uuid
 Key: KAFKA-17033
 URL: https://issues.apache.org/jira/browse/KAFKA-17033
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio


One way to handle this is to introduce a type called `DirectoryId` that just 
encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16965) Add a "root cause" exception as a nested exception to TimeoutException for Producer

2024-06-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16965:

Component/s: clients

> Add a "root cause" exception as a nested exception to TimeoutException for 
> Producer
> ---
>
> Key: KAFKA-16965
> URL: https://issues.apache.org/jira/browse/KAFKA-16965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Minor
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16965) Add a "root cause" exception as a nested exception to TimeoutException for Producer

2024-06-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16965.
-
Fix Version/s: 3.9.0
   Resolution: Fixed

> Add a "root cause" exception as a nested exception to TimeoutException for 
> Producer
> ---
>
> Key: KAFKA-16965
> URL: https://issues.apache.org/jira/browse/KAFKA-16965
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Minor
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16991) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2024-06-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859780#comment-17859780
 ] 

Matthias J. Sax commented on KAFKA-16991:
-

Failed again: 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16344/10/tests]

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-16991
> URL: https://issues.apache.org/jira/browse/KAFKA-16991
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> 5owo5xbyzjnao-org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest-shouldRestoreState()-1-output.txt
>
>
> We see this test running into timeouts more frequently recently.
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms. ==> expected:  but was: at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)•••at
>  
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)at
>  
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)at
>  org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367)at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:220)
>  {code}
> There was no ERROR or WARN log...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17001) Consider using another class to replace `AbstractConfig` to be class which always returns the up-to-date configs

2024-06-24 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859766#comment-17859766
 ] 

Greg Harris commented on KAFKA-17001:
-

[~chia7712] That sounds reasonable. The Supplier.get implementation can be very 
lightweight, like AtomicReference::get.

> Consider using another class to replace `AbstractConfig` to be class which 
> always returns the up-to-date configs
> 
>
> Key: KAFKA-17001
> URL: https://issues.apache.org/jira/browse/KAFKA-17001
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from https://github.com/apache/kafka/pull/16394#discussion_r1647321514
> We are starting to have separate config class ( i.e RemoteLogManagerConfig), 
> and those configs will be initialized with a AbstractConfig. By calling 
> `AbstractConfig' getters, those individual configs can always return the 
> up-to-date configs. Behind the magic behavior is the instance of 
> `AbstractConfig` ... yes, we use the `KafkaConfig` to construct those config 
> classes. We call `KafkaConfig#updateCurrentConfig` to update inner configs, 
> so those config classes which using `AbstractConfig` can see the latest 
> configs too.
> However, this mechanism is not readable from `AbstractConfig`. Maybe we 
> should add enough docs for it. Or we can 
> move`KafkaConfig#updateCurrentConfig` into a new class with better naming.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17001) Consider using another class to replace `AbstractConfig` to be class which always returns the up-to-date configs

2024-06-24 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859764#comment-17859764
 ] 

Chia-Ping Tsai commented on KAFKA-17001:


{quote}

In situations with "poll-based" reconfiguration, where we want a lazy update 
whenever we ask for it, the mutability can be expressed via 
`Supplier`.

{quote}

This idea is interesting, so I try to draw my imagination.
 # `RemoteLogManagerConfig` should be immutable
 # remote-related configs should be from `RemoteLogManagerConfig`

That means those classes which need to access remote-related configs should 
have `Supplier` instead of  `RemoteLogManagerConfig`. 
Also, they call `Supplier.get` whenever we ask for remote-related configs.

In order to avoid creating a lot of `RemoteLogManagerConfig`, the impl of 
`Supplier` should return identical 
`RemoteLogManagerConfig` unless `KafkaConfig` gets updated.

[~gharris1727] WDYT?

> Consider using another class to replace `AbstractConfig` to be class which 
> always returns the up-to-date configs
> 
>
> Key: KAFKA-17001
> URL: https://issues.apache.org/jira/browse/KAFKA-17001
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from https://github.com/apache/kafka/pull/16394#discussion_r1647321514
> We are starting to have separate config class ( i.e RemoteLogManagerConfig), 
> and those configs will be initialized with a AbstractConfig. By calling 
> `AbstractConfig' getters, those individual configs can always return the 
> up-to-date configs. Behind the magic behavior is the instance of 
> `AbstractConfig` ... yes, we use the `KafkaConfig` to construct those config 
> classes. We call `KafkaConfig#updateCurrentConfig` to update inner configs, 
> so those config classes which using `AbstractConfig` can see the latest 
> configs too.
> However, this mechanism is not readable from `AbstractConfig`. Maybe we 
> should add enough docs for it. Or we can 
> move`KafkaConfig#updateCurrentConfig` into a new class with better naming.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17032) NioEchoServer should generate meaningful id instead of incremential number

2024-06-24 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17032:
--

 Summary: NioEchoServer should generate meaningful id instead of 
incremential number
 Key: KAFKA-17032
 URL: https://issues.apache.org/jira/browse/KAFKA-17032
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: TengYao Chi


see discussion: 
https://github.com/apache/kafka/pull/16384#issuecomment-2187071751



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17001) Consider using another class to replace `AbstractConfig` to be class which always returns the up-to-date configs

2024-06-24 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859753#comment-17859753
 ] 

Greg Harris commented on KAFKA-17001:
-

IMHO mutable config classes are undesirable, both for concurrency problems and 
the inherent complexity.

In situations with "poll-based" reconfiguration, where we want a lazy update 
whenever we ask for it, the mutability can be expressed via 
`Supplier`.
In situations with "push-based" reconfiguration, where we want an eager 
callback whenever a reconfiguration takes place, the mutability can be 
expressed via the `Reconfigurable` interface.

> Consider using another class to replace `AbstractConfig` to be class which 
> always returns the up-to-date configs
> 
>
> Key: KAFKA-17001
> URL: https://issues.apache.org/jira/browse/KAFKA-17001
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from https://github.com/apache/kafka/pull/16394#discussion_r1647321514
> We are starting to have separate config class ( i.e RemoteLogManagerConfig), 
> and those configs will be initialized with a AbstractConfig. By calling 
> `AbstractConfig' getters, those individual configs can always return the 
> up-to-date configs. Behind the magic behavior is the instance of 
> `AbstractConfig` ... yes, we use the `KafkaConfig` to construct those config 
> classes. We call `KafkaConfig#updateCurrentConfig` to update inner configs, 
> so those config classes which using `AbstractConfig` can see the latest 
> configs too.
> However, this mechanism is not readable from `AbstractConfig`. Maybe we 
> should add enough docs for it. Or we can 
> move`KafkaConfig#updateCurrentConfig` into a new class with better naming.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-16990:
---
Priority: Major  (was: Blocker)

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> line 35, in wrapper
>     return method(self, *args, **kwargs)
>   File 
> 

[jira] [Commented] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859746#comment-17859746
 ] 

Josep Prat commented on KAFKA-16990:


Then I'll change the priority to "major" as it's not a blocker anymore for 3.8. 
Unless you want to change the "fix version"

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> 

[jira] [Reopened] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reopened KAFKA-16990:


> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> line 35, in wrapper
>     return method(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> 

[jira] [Commented] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859743#comment-17859743
 ] 

Justine Olshan commented on KAFKA-16990:


Sorry I will actually reopen until trunk is merged. 

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> line 35, in wrapper
>     return method(self, *args, 

[jira] [Updated] (KAFKA-17031) Make configurations public

2024-06-24 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-17031:
--
Description: 
*Summary*

We introduce 3 new configurations as part of this KIP. Up to now they have been 
defined as internal only. We need to change said configurations to public prior 
to the release.

  was:
### Summary

We introduce 3 new configurations as part of this KIP. Up to now they have been 
defined as internal only. We need to change said configurations to public prior 
to the release.


> Make configurations public
> --
>
> Key: KAFKA-17031
> URL: https://issues.apache.org/jira/browse/KAFKA-17031
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Priority: Major
>
> *Summary*
> We introduce 3 new configurations as part of this KIP. Up to now they have 
> been defined as internal only. We need to change said configurations to 
> public prior to the release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17031) Make configurations public

2024-06-24 Thread Christo Lolov (Jira)
Christo Lolov created KAFKA-17031:
-

 Summary: Make configurations public
 Key: KAFKA-17031
 URL: https://issues.apache.org/jira/browse/KAFKA-17031
 Project: Kafka
  Issue Type: Sub-task
Reporter: Christo Lolov


### Summary

We introduce 3 new configurations as part of this KIP. Up to now they have been 
defined as internal only. We need to change said configurations to public prior 
to the release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16853) Split RemoteLogManagerScheduledThreadPool

2024-06-24 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859742#comment-17859742
 ] 

Christo Lolov commented on KAFKA-16853:
---

Could I also be looped into the review please?

> Split RemoteLogManagerScheduledThreadPool
> -
>
> Key: KAFKA-16853
> URL: https://issues.apache.org/jira/browse/KAFKA-16853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Abhijeet Kumar
>Priority: Major
>
> *Summary*
> To begin with create just the RemoteDataExpirationThreadPool and move 
> expiration to it. Keep all settings as if the only thread pool was the 
> RemoteLogManagerScheduledThreadPool. Ensure that the new thread pool is wired 
> correctly to the RemoteLogManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859733#comment-17859733
 ] 

Justine Olshan commented on KAFKA-16990:


[https://github.com/apache/kafka/commit/35b34a85a7bed1f2e9bd23ae961cac85602435e6]
I will need to do a similar fix for trunk, but was trying to unblock 3.8 
quickly.

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> 

[jira] [Commented] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859730#comment-17859730
 ] 

Josep Prat commented on KAFKA-16990:


Which was the PR fixing this?

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> line 35, in wrapper
>     return method(self, *args, **kwargs)
>   File 
> 

[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors

2024-06-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16142:
--
Summary: Update metrics documentation for errors  (was: Update metrics 
documentation for errors and new metrics)

> Update metrics documentation for errors
> ---
>
> Key: KAFKA-16142
> URL: https://issues.apache.org/jira/browse/KAFKA-16142
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, documentation, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.9.0
>
>
> We need to identify the “errors” that exist in the current JMX documentation 
> and resolve them. Per [~pnee] there are errors on the JMX web page, which he 
> will identify and resolve.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16143) New JMX metrics for AsyncKafkaConsumer

2024-06-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16143:
-

Assignee: Brenden DeLuna  (was: Philip Nee)

> New JMX metrics for AsyncKafkaConsumer
> --
>
> Key: KAFKA-16143
> URL: https://issues.apache.org/jira/browse/KAFKA-16143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Brenden DeLuna
>Priority: Minor
>  Labels: kip-848-client-support, metrics, needs-kip
> Fix For: 3.9.0
>
>
> This task is to consider what _new_ metrics we need from the KIP-848 protocol 
> that aren't already exposed by the current set of metrics. This will require 
> a KIP to introduce the new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16142) Update metrics documentation for errors and new metrics

2024-06-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16142:
--
Description: We need to identify the “errors” that exist in the current JMX 
documentation and resolve them. Per [~pnee] there are errors on the JMX web 
page, which he will identify and resolve.  (was: What is the documentation to 
update here? AFAIUI, we're not changing or adding metrics for 3.8.0, so there 
wouldn't be anything to add/change.)

> Update metrics documentation for errors and new metrics
> ---
>
> Key: KAFKA-16142
> URL: https://issues.apache.org/jira/browse/KAFKA-16142
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, documentation, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, metrics
> Fix For: 3.9.0
>
>
> We need to identify the “errors” that exist in the current JMX documentation 
> and resolve them. Per [~pnee] there are errors on the JMX web page, which he 
> will identify and resolve.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16143) New JMX metrics for AsyncKafkaConsumer

2024-06-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16143:
--
Summary: New JMX metrics for AsyncKafkaConsumer  (was: New metrics for 
KIP-848 protocol)

> New JMX metrics for AsyncKafkaConsumer
> --
>
> Key: KAFKA-16143
> URL: https://issues.apache.org/jira/browse/KAFKA-16143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support, metrics, needs-kip
> Fix For: 3.9.0
>
>
> This task is to consider what _new_ metrics we need from the KIP-848 protocol 
> that aren't already exposed by the current set of metrics. This will require 
> a KIP to introduce the new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-16990.

Resolution: Fixed

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", 
> line 35, in wrapper
>     return method(self, *args, **kwargs)
>   File 
> 

[jira] [Comment Edited] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859727#comment-17859727
 ] 

Justine Olshan edited comment on KAFKA-16990 at 6/24/24 4:08 PM:
-

This was a blocker, but I merged the fix. (Believe it or not there were two 
bugs in this test :( )
I will mark this as complete. The other Jira is open and hopefully the PR will 
be merged soon.


was (Author: jolshan):
This was a blocker, but I merged the fix. (Believe it or not there were two 
bugs in this test :( )
I will mark this as complete.

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File 

[jira] [Commented] (KAFKA-16990) Unrecognised flag passed to kafka-storage.sh in system test

2024-06-24 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859727#comment-17859727
 ] 

Justine Olshan commented on KAFKA-16990:


This was a blocker, but I merged the fix. (Believe it or not there were two 
bugs in this test :( )
I will mark this as complete.

> Unrecognised flag passed to kafka-storage.sh in system test
> ---
>
> Key: KAFKA-16990
> URL: https://issues.apache.org/jira/browse/KAFKA-16990
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.8.0
>
>
> Running 
> {{TC_PATHS="tests/kafkatest/tests/core/kraft_upgrade_test.py::TestKRaftUpgrade"
>  bash tests/docker/run_tests.sh}} on trunk (c4a3d2475f) fails with the 
> following:
> {code:java}
> [INFO:2024-06-18 09:16:03,139]: Triggering test 2 of 32...
> [INFO:2024-06-18 09:16:03,147]: RunnerClient: Loading test {'directory': 
> '/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
> 'kraft_upgrade_test.py', 'cls_name': 'TestKRaftUpgrade', 'method_name': 
> 'test_isolated_mode_upgrade', 'injected_args': {'from_kafka_version': 
> '3.1.2', 'use_new_coordinator': True, 'metadata_quorum': 'ISOLATED_KRAFT'}}
> [INFO:2024-06-18 09:16:03,151]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  on run 1/1
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Setting up...
> [INFO:2024-06-18 09:16:03,153]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Running...
> [INFO:2024-06-18 09:16:05,999]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  Tearing down...
> [INFO:2024-06-18 09:16:12,366]: RunnerClient: 
> kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT:
>  FAIL: RemoteCommandError({'ssh_config': {'host': 'ducker10', 'hostname': 
> 'ducker10', 'user': 'ducker', 'port': 22, 'password': '', 'identityfile': 
> '/home/ducker/.ssh/id_rsa', 'connecttimeout': None}, 'hostname': 'ducker10', 
> 'ssh_hostname': 'ducker10', 'user': 'ducker', 'externally_routable_ip': 
> 'ducker10', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=3.1.2.use_new_coordinator=True.metadata_quorum=ISOLATED_KRAFT-2
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x85bccc70>, '_sftp_client':  0x85bccdf0>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-3.1.2/bin/kafka-storage.sh format --ignore-formatted --config 
> /mnt/kafka/kafka.properties --cluster-id I2eXt9rvSnyhct8BYmW6-w -f 
> group.version=1', 1, b"usage: kafka-storage format [-h] --config CONFIG 
> --cluster-id CLUSTER_ID\n                     
> [--ignore-formatted]\nkafka-storage: error: unrecognized arguments: '-f'\n")
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 132, in test_isolated_mode_upgrade
>     self.run_upgrade(from_kafka_version, group_protocol)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/kraft_upgrade_test.py", 
> line 96, in run_upgrade
>     self.kafka.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 669, in 
> start
>     self.isolated_controller_quorum.start()
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 671, in 
> start
>     Service.start(self, **kwargs)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
> line 265, in start
>     self.start_node(node, **kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/services/kafka/kafka.py", line 902, in 
> start_node
>     node.account.ssh(cmd)
>   File 
> 

[jira] [Commented] (KAFKA-14460) In-memory store iterators can return results with null values

2024-06-24 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859725#comment-17859725
 ] 

Ayoub Omari commented on KAFKA-14460:
-

[~ableegoldman] Is this ticket only about KeyValueStore ? I see that for Window 
and Session stores, iterators work directly on the underlying segments

> In-memory store iterators can return results with null values
> -
>
> Key: KAFKA-14460
> URL: https://issues.apache.org/jira/browse/KAFKA-14460
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>
> Due to the thread-safety model we adopted in our in-memory stores to avoid 
> scaling issues, we synchronize all read/write methods and then during range 
> scans, copy the keyset of all results rather than returning a direct iterator 
> over the underlying map. When users call #next to read out the iterator 
> results, we issue a point lookup on the next key and then simply return a new 
> KeyValue<>(key, get(key))
> This lets the range scan return results without blocking access to the store 
> by other threads and without risk of ConcurrentModification, as a writer can 
> modify the real store without affecting the keyset copy of the iterator. This 
> also means that those changes won't be reflected in what the iterator sees or 
> returns, which in itself is fine as we don't guarantee consistency semantics 
> of any kind.
> However, we _do_ guarantee that range scans "must not return null values" – 
> and this contract may be violated if the StreamThread deletes a record that 
> the iterator was going to return.
> tl;dr we should check get(key) for null and skip to the next result if 
> necessary in the in-memory store iterators. See for example 
> InMemoryKeyValueIterator (note that we'll probably need to buffer one record 
> in advance before we return true from #hasNext)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17013) RequestManager#ConnectionState#toString() should use %s

2024-06-24 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17013.

Fix Version/s: 3.9.0
   Resolution: Fixed

> RequestManager#ConnectionState#toString() should use %s
> ---
>
> Key: KAFKA-17013
> URL: https://issues.apache.org/jira/browse/KAFKA-17013
> Project: Kafka
>  Issue Type: Bug
>Reporter: dujian0068
>Assignee: dujian0068
>Priority: Minor
> Fix For: 3.9.0
>
>
> RequestManager#ConnectionState#toString() should use %s
> [https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/RequestManager.java#L375]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859717#comment-17859717
 ] 

Chia-Ping Tsai commented on KAFKA-17027:


[~yangpoan] thanks for taking over this jira!

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Assignee: PoAn Yang
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17027:
--

Assignee: PoAn Yang

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Assignee: PoAn Yang
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17030) Voters should not assume that the leader will send them BeginQuorumEpoch requests

2024-06-24 Thread Jira
José Armando García Sancio created KAFKA-17030:
--

 Summary: Voters should not assume that the leader will send them 
BeginQuorumEpoch requests
 Key: KAFKA-17030
 URL: https://issues.apache.org/jira/browse/KAFKA-17030
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio


This issues depends on implementing 
https://issues.apache.org/jira/browse/KAFKA-16164

Because of reconfiguration it is possible for a local replica to think that 
they are a voter but the KRaft leader doesn't have them in the voter set. Think 
a voter removal that committed before it was replicated to the removed voter.

To address this issue KIP-853 suggest using Pre-Vote to fence the local replica 
from increasing their epoch. But in this state the local replica will be stuck 
because it will never discover the new leader. To fix this voters must send 
Fetch requests to the bootstrap server and or the known leader while in the 
prospective state. This is similar to how observer discover the leader in the 
unattached and follower state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16334) Remove Deprecated command line option from reset tool

2024-06-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859713#comment-17859713
 ] 

Matthias J. Sax commented on KAFKA-16334:
-

Thanks for picking this up. Note, that this ticket can only go into 4.0 
release. The next releases are 3.8 and 3.9, so we cannot merge any PR at this 
point. 4.0 is currently planned after 3.9.

> Remove Deprecated command line option from reset tool
> -
>
> Key: KAFKA-16334
> URL: https://issues.apache.org/jira/browse/KAFKA-16334
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Caio César
>Priority: Blocker
> Fix For: 4.0.0
>
>
> --bootstrap-server (singular) was deprecated in 3.4 release via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859714#comment-17859714
 ] 

PoAn Yang commented on KAFKA-17027:
---

Hi [~chiacyu], I saw you deleted the comment. If you're not working on it, may 
I take the issue?  Thank you.

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16327) Remove Deprecated variable StreamsConfig#TOPOLOGY_OPTIMIZATION

2024-06-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859711#comment-17859711
 ] 

Matthias J. Sax commented on KAFKA-16327:
-

[~ksolves.kafka] Thanks for picking this up. The next planned releases are 3.8 
and 3.9 – This change can only go into 4.0, so we cannot merge the PR yet. – We 
will cycle back to your PR when 4.0 comes along (most likely after 3.9 release).

> Remove Deprecated variable StreamsConfig#TOPOLOGY_OPTIMIZATION 
> ---
>
> Key: KAFKA-16327
> URL: https://issues.apache.org/jira/browse/KAFKA-16327
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Ksolves
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in 2.7 release via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17029) SetSchemaMetadata smt throws DataException when avro schema is primitive type

2024-06-24 Thread Mark McDonald (Jira)
Mark McDonald created KAFKA-17029:
-

 Summary: SetSchemaMetadata smt throws DataException when avro 
schema is primitive type
 Key: KAFKA-17029
 URL: https://issues.apache.org/jira/browse/KAFKA-17029
 Project: Kafka
  Issue Type: Bug
Reporter: Mark McDonald


The SetSchemaMetadata smt can be used to set the schema name and version on 
connect records. However, if the connect record is an Avro primitive type, the 
smt thows a DataException. The reason for this is because there is a call to 
the Schema [fields() 
method|https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/Schema.html#fields--].
 This smt should be able to handle the case when the schema is not a struct. 
[This is 
where|https://github.com/apache/kafka/blob/42058462ac9ef65e0cad4240358bbb5db9cebcd4/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L90]
 the DataException is encountered.

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]

2024-06-24 Thread via GitHub


philipnee commented on code in PR #16312:
URL: https://github.com/apache/kafka/pull/16312#discussion_r1651126643


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1161,8 +1155,9 @@ public void 
testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
 membershipManager.poll(time.milliseconds());
 
 Set expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
+HashSet expectedSet = new 
HashSet<>(expectedAssignment);

Review Comment:
   hey why exactly do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17028) FindCoordinator v6 initial implementation

2024-06-24 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-17028:


 Summary: FindCoordinator v6 initial implementation
 Key: KAFKA-17028
 URL: https://issues.apache.org/jira/browse/KAFKA-17028
 Project: Kafka
  Issue Type: Sub-task
Reporter: Andrew Schofield
Assignee: Andrew Schofield






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16000 Migrated MembershipManagerImplTest away from ConsumerTestBuilder [kafka]

2024-06-24 Thread via GitHub


philipnee commented on code in PR #16312:
URL: https://github.com/apache/kafka/pull/16312#discussion_r1651126643


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##
@@ -1161,8 +1155,9 @@ public void 
testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
 membershipManager.poll(time.milliseconds());
 
 Set expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
+HashSet expectedSet = new 
HashSet<>(expectedAssignment);

Review Comment:
   `Set` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17004) MINOR: Remove extra synchronized blocks in SharePartitionManager

2024-06-24 Thread Abhinav Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Dixit resolved KAFKA-17004.
---
Fix Version/s: 4.0.0
   3.9.0
   Resolution: Fixed

> MINOR: Remove extra synchronized blocks in SharePartitionManager
> 
>
> Key: KAFKA-17004
> URL: https://issues.apache.org/jira/browse/KAFKA-17004
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Abhinav Dixit
>Assignee: Abhinav Dixit
>Priority: Major
> Fix For: 4.0.0, 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16536: Use BeginQuorumEpoch as leader heartbeat [kafka]

2024-06-24 Thread via GitHub


jsancio commented on code in PR #16399:
URL: https://github.com/apache/kafka/pull/16399#discussion_r1651117067


##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -628,7 +629,7 @@ void deliverResponse(int correlationId, Node source, 
ApiMessage response) {
 RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int 
numBeginEpochRequests) {
 List requests = collectBeginEpochRequests(epoch);
 assertEquals(numBeginEpochRequests, requests.size());
-return requests.get(0);
+return !requests.isEmpty() ? requests.get(0) : null;

Review Comment:
   Let's avoid returning `null` from "public" methods.
   
   We normally don't test the absence of a side effect because those checks are 
technically infinite. Maybe we can expose `collectBeginEpochRequests` as 
package private and check the size of the list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17001) Consider using another class to replace `AbstractConfig` to be class which always returns the up-to-date configs

2024-06-24 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng updated KAFKA-17001:
--
Summary: Consider using another class to replace `AbstractConfig` to be 
class which always returns the up-to-date configs  (was: Consider using another 
class to replace `AbstractConfig` to be class which alwasy returns the 
up-to-date configs)

> Consider using another class to replace `AbstractConfig` to be class which 
> always returns the up-to-date configs
> 
>
> Key: KAFKA-17001
> URL: https://issues.apache.org/jira/browse/KAFKA-17001
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from https://github.com/apache/kafka/pull/16394#discussion_r1647321514
> We are starting to have separate config class ( i.e RemoteLogManagerConfig), 
> and those configs will be initialized with a AbstractConfig. By calling 
> `AbstractConfig' getters, those individual configs can always return the 
> up-to-date configs. Behind the magic behavior is the instance of 
> `AbstractConfig` ... yes, we use the `KafkaConfig` to construct those config 
> classes. We call `KafkaConfig#updateCurrentConfig` to update inner configs, 
> so those config classes which using `AbstractConfig` can see the latest 
> configs too.
> However, this mechanism is not readable from `AbstractConfig`. Maybe we 
> should add enough docs for it. Or we can 
> move`KafkaConfig#updateCurrentConfig` into a new class with better naming.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove extra synchronized block in SharePartitionManager [kafka]

2024-06-24 Thread via GitHub


omkreddy merged PR #16436:
URL: https://github.com/apache/kafka/pull/16436


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16536: Use BeginQuorumEpoch as leader heartbeat [kafka]

2024-06-24 Thread via GitHub


jsancio commented on code in PR #16399:
URL: https://github.com/apache/kafka/pull/16399#discussion_r1651079179


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2096,6 +2096,26 @@ private long maybeAppendBatches(
 return timeUntilDrain;
 }
 
+private long maybeSendBeginQuorumEpochRequests(
+LeaderState state,
+long currentTimeMs
+) {
+long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
+if (timeUntilNextBeginQuorumSend == 0) {
+timeUntilNextBeginQuorumSend = maybeSendRequests(
+currentTimeMs,
+partitionState
+.lastVoterSet()
+.voterNodes(state.votersExcludingLeader().stream(), 
channel.listenerName()),

Review Comment:
   The latest voters are in `partitionState.lastVoterSet()`. It is odd that 
`KafkaRaftClient` needs to query `LeaderState` to exclude itself from the 
`VoterSet`. How about `voters.voterKeys().stream().filter(!self)`?



##
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##
@@ -628,7 +629,7 @@ void deliverResponse(int correlationId, Node source, 
ApiMessage response) {
 RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int 
numBeginEpochRequests) {
 List requests = collectBeginEpochRequests(epoch);
 assertEquals(numBeginEpochRequests, requests.size());
-return requests.get(0);
+return !requests.isEmpty() ? requests.get(0) : null;

Review Comment:
   Let's avoid returning `null` from "public" methods. We normally don't test 
the absence of a side effect because those checks are technically infinite. 
Maybe we can expose `collectBeginEpochRequests` as package private and check 
the size of the list.



##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -497,6 +497,32 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
 context.listener.currentLeaderAndEpoch());
 }
 
+@Test
+public void testBeginQuorumHeartbeat() throws Exception {
+int localId = 0;
+int remoteId1 = 1;
+int remoteId2 = 2;
+Set voters = Utils.mkSet(localId, remoteId1, remoteId2);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters).build();
+
+context.becomeLeader();
+assertEquals(OptionalInt.of(localId), context.currentLeader());
+
+// begin epoch requests should be sent out every 
beginQuorumEpochTimeoutMs
+context.time.sleep(context.beginQuorumEpochTimeoutMs);
+context.client.poll();
+context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), 2);

Review Comment:
   Does this method check that it was sent to `remoteId1` and `remoteId2`? If 
not, we should check that begin quorum epoch request was sent to all of the 
remote voters.



##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2096,6 +2096,26 @@ private long maybeAppendBatches(
 return timeUntilDrain;
 }
 
+private long maybeSendBeginQuorumEpochRequests(
+LeaderState state,
+long currentTimeMs
+) {
+long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
+if (timeUntilNextBeginQuorumSend == 0) {
+timeUntilNextBeginQuorumSend = maybeSendRequests(
+currentTimeMs,
+partitionState
+.lastVoterSet()
+.voterNodes(state.votersExcludingLeader().stream(), 
channel.listenerName()),
+this::buildBeginQuorumEpochRequest
+);
+state.resetBeginQuorumEpochTimer(currentTimeMs);
+logger.trace("Attempted to send BeginQuorumEpochRequest as 
heartbeat to all voters. " +
+"Request can be retried in {} ms", 
timeUntilNextBeginQuorumSend);

Review Comment:
   This doesn't match the formatting used in this file or module:
   ```java
   logger.trace(
   "Attempted to send BeginQuorumEpochRequest as heartbeat to 
all voters. " +
   "Request can be retried in {} ms",
   timeUntilNextBeginQuorumSend
   );
   ```



##
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##
@@ -866,6 +872,27 @@ public void testGrantVote(boolean isLogUpToDate) {
 assertFalse(state.canGrantVote(ReplicaKey.of(3, Optional.empty()), 
isLogUpToDate));
 }
 
+@Test
+public void testBeginQuorumEpochTimer() {
+int leader = localId;
+int follower1 = 1;
+long epochStartOffset = 10L;
+
+Set voterSet = mkSet(leader, follower1);
+LeaderState state = newLeaderState(voterSet, epochStartOffset);
+assertEquals(0, 

Re: [PR] KAFKA-16536: Use BeginQuorumEpoch as leader heartbeat [kafka]

2024-06-24 Thread via GitHub


jsancio commented on code in PR #16399:
URL: https://github.com/apache/kafka/pull/16399#discussion_r1651084728


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -220,7 +234,14 @@ public Uuid localDirectoryId() {
 return localDirectoryId;
 }
 
-public Set nonAcknowledgingVoters() {
+public Set votersExcludingLeader() {
+Set voters = new HashSet<>(voterStates.keySet());
+voters.remove(localId);
+return voters;
+}

Review Comment:
   See my other comment but you should be able to remove this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14348) Consider renaming MetadataBatchProcessingTimeUs to MetadataDeltaProcessingTimeUs

2024-06-24 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859698#comment-17859698
 ] 

Ksolves edited comment on KAFKA-14348 at 6/24/24 2:13 PM:
--

[~cmccabe] It's an old ticket but would like to confirm if the change should be 
made in the 
[BrokerServerMetrics.scala#L36-L39.|https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala#L36-L39]
{code:java}
private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
    "BrokerMetadataListener",
      "MetadataBatchProcessingTimeUs",
    Collections.emptyMap()){code}
 


was (Author: JIRAUSER305714):
[~cmccabe] It's an old ticket but would like to confirm if the change should be 
made in 
the`[BrokerServerMetrics.scala#L36-L39|https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala#L36-L39]`?

```
private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
    "BrokerMetadataListener",
      "{*}MetadataBatchProcessingTimeUs{*}",
    Collections.emptyMap())

```

> Consider renaming MetadataBatchProcessingTimeUs to 
> MetadataDeltaProcessingTimeUs
> 
>
> Key: KAFKA-14348
> URL: https://issues.apache.org/jira/browse/KAFKA-14348
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> We should consider renaming kafka.server.MetadataBatchProcessingTimeUs to 
> kafka.server.MetadataDeltaProcessingTimeUs. The reason is because this metric 
> isn't the time to process a single batch, but the time to process a group of 
> batches given to us by the raft layer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14348) Consider renaming MetadataBatchProcessingTimeUs to MetadataDeltaProcessingTimeUs

2024-06-24 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859698#comment-17859698
 ] 

Ksolves commented on KAFKA-14348:
-

[~cmccabe] It's an old ticket but would like to confirm if the change should be 
made in 
the`[BrokerServerMetrics.scala#L36-L39|https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala#L36-L39]`?

```
private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
    "BrokerMetadataListener",
      "{*}MetadataBatchProcessingTimeUs{*}",
    Collections.emptyMap())

```

> Consider renaming MetadataBatchProcessingTimeUs to 
> MetadataDeltaProcessingTimeUs
> 
>
> Key: KAFKA-14348
> URL: https://issues.apache.org/jira/browse/KAFKA-14348
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>
> We should consider renaming kafka.server.MetadataBatchProcessingTimeUs to 
> kafka.server.MetadataDeltaProcessingTimeUs. The reason is because this metric 
> isn't the time to process a single batch, but the time to process a group of 
> batches given to us by the raft layer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859695#comment-17859695
 ] 

Mickael Maison commented on KAFKA-17027:


[~chia7712]  Since we'll have a 3.9 release, yeah we may be able to squeeze 
this in, and get rid of the inconsistent metrics in 4.0.

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Chia Chuan Yu (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-17027 ]


Chia Chuan Yu deleted comment on KAFKA-17027:
---

was (Author: JIRAUSER304417):
Hi, [~mimaison]

Can I have this one? I'm interested in this task. Thanks! 

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859693#comment-17859693
 ] 

Mickael Maison commented on KAFKA-17027:


[~chiacyu] If a ticket is not assigned to anybody, feel free to grab it.

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13867) Improve JavaDoc for MetadataVersion.java

2024-06-24 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859692#comment-17859692
 ] 

Ksolves commented on KAFKA-13867:
-

[~cmccabe]  There are no variable named "version" defined in the 
`MetadataVersion.java` class (probably removed later). The similar variables 
present are VERSIONS and ibpVersion. Please confirm if any of these variables 
and their occurrences need to be changed.

> Improve JavaDoc for MetadataVersion.java
> 
>
> Key: KAFKA-13867
> URL: https://issues.apache.org/jira/browse/KAFKA-13867
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Chia Chuan Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859690#comment-17859690
 ] 

Chia Chuan Yu commented on KAFKA-17027:
---

Hi, [~mimaison]

Can I have this one? I'm interested in this task. Thanks! 

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859689#comment-17859689
 ] 

Chia-Ping Tsai commented on KAFKA-17027:


Should we deprecate the non lower camel metrics? If so, it would be great to be 
completed in 3.9 :)

> Inconsistent casing in Selector metrics tags
> 
>
> Key: KAFKA-17027
> URL: https://issues.apache.org/jira/browse/KAFKA-17027
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, metrics
>Reporter: Mickael Maison
>Priority: Major
>
> When creating metric tags for a Selector instance, we use "broker-id" in 
> ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
> but we use "BrokerId" in NodeToControllerChannelManagerImpl.
> Not only these casing are inconsistent for metrics tags for the same 
> component (Selector) but it looks like neither match the casing the use for 
> other broker metrics!
> We seem to always use lower camel case for tags for broker metrics. For 
> example, we have "networkProcessor", "clientId", "delayedOperation", 
> "clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.
> Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17027) Inconsistent casing in Selector metrics tags

2024-06-24 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-17027:
--

 Summary: Inconsistent casing in Selector metrics tags
 Key: KAFKA-17027
 URL: https://issues.apache.org/jira/browse/KAFKA-17027
 Project: Kafka
  Issue Type: Improvement
  Components: core, metrics
Reporter: Mickael Maison


When creating metric tags for a Selector instance, we use "broker-id" in 
ControllerChannelManager, BrokerBlockingSender and ReplicaFetcherBlockingSend 
but we use "BrokerId" in NodeToControllerChannelManagerImpl.

Not only these casing are inconsistent for metrics tags for the same component 
(Selector) but it looks like neither match the casing the use for other broker 
metrics!

We seem to always use lower camel case for tags for broker metrics. For 
example, we have "networkProcessor", "clientId", "delayedOperation", 
"clientSoftwareName", "clientSoftwareVersion" as tags on other metrics.

Fixing this will require a KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650928682


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);
+assertEquals(1, mockList.size());
+}
+
+private TimerTask createTimerTask(List mockList) {
+return new TimerTask(100) {
+@Override

Review Comment:
   since, this functionality is required at 2 places, I thought to create it as 
a different function `createTimerTask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-06-24 Thread via GitHub


jsancio commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1651072253


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -102,6 +109,11 @@ object StorageTool extends Logging {
   setClusterId(clusterId).
   setNodeId(config.nodeId).
   build()
+val standaloneMode = namespace.getBoolean("standalone")
+val advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint] = 
config.effectiveAdvertisedListeners
+
+// effectiveAdvertisedControllerListeners to be added

Review Comment:
   I think we should wait. The bootstrap checkpoint file is incorrect without 
https://github.com/apache/kafka/pull/16235https://github.com/apache/kafka/pull/16235



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16967) NioEchoServer fails to register connection and causes flaky failure

2024-06-24 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16967.

Fix Version/s: 3.9.0
   Resolution: Fixed

> NioEchoServer fails to register connection and causes flaky failure
> ---
>
> Key: KAFKA-16967
> URL: https://issues.apache.org/jira/browse/KAFKA-16967
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: flaky-test, newbie
> Fix For: 3.9.0
>
>
> The NioEchoServer calls Selector#register for new connections. This call can 
> throw exceptions, which then kill the NioEchoServer. This has been observed 
> in the SslTransportLayerTest testUngracefulRemoteCloseDuringHandshake* 
> methods.
> {noformat}
> Exception in thread "echoserver" java.lang.IllegalStateException: There is 
> already a connection for id 127.0.0.1:40007-127.0.0.1:43710
>   at 
> org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:322)
>   at org.apache.kafka.common.network.Selector.register(Selector.java:310)
>   at 
> org.apache.kafka.common.network.NioEchoServer.run(NioEchoServer.java:229){noformat}
> This causes the test to fail with essentially a timeout, when the connection 
> is expired for becoming idle unexpectedly:
> {noformat}
> org.opentest4j.AssertionFailedError: Unexpected channel state EXPIRED ==> 
> expected:  but was: 
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>     at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at 
> org.apache.kafka.common.network.SslTransportLayerTest.testIOExceptionsDuringHandshake(SslTransportLayerTest.java:898)
> at 
> org.apache.kafka.common.network.SslTransportLayerTest.testUngracefulRemoteCloseDuringHandshakeRead(SslTransportLayerTest.java:837){noformat}
> Instead, the NioEchoServer should handle exceptions from register in a 
> similar fashion to the SocketServer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16967: NioEchoServer fails to register connection and causes flaky failure. [kafka]

2024-06-24 Thread via GitHub


chia7712 merged PR #16384:
URL: https://github.com/apache/kafka/pull/16384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1651048314


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.ifPresent(key -> {
+short keyVersion = ByteBuffer.wrap(key).getShort();

Review Comment:
   please fix the indent size



##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogKeyJsonConverter;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+private static final String VERSION = "version";
+private static final String DATA = "data";
+
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.ifPresent(key -> {
+short keyVersion = ByteBuffer.wrap(key).getShort();
+byte[] value = consumerRecord.value();
+short valueVersion = ByteBuffer.wrap(value).getShort();
+
+TransactionLogKey transactionLogKey = 

Re: [PR] KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState [kafka]

2024-06-24 Thread via GitHub


muralibasani commented on PR #16257:
URL: https://github.com/apache/kafka/pull/16257#issuecomment-2186577649

   > Heya @muralibasani, could you just fix
   > 
   > ```
   > [2024-06-21T10:19:19.373Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16257/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:2105:
  error: no suitable constructor found for 
RemoteLogSegmentMetadata(RemoteLogSegmentId,long,long,long,int,long,int,Optional,RemoteLogSegmentState,NavigableMap)
   > ```
   > 
   > from the build?
   
   Seems like a new test was added. Updated it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


sebastienviale commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1651019264


##
streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Category(IntegrationTest.class)
+public class ProcessingExceptionHandlerIntegrationTest {
+private final String threadId = Thread.currentThread().getName();
+
+@Test
+public void shouldContinueWhenProcessingExceptionOccursAtBeginning() {

Review Comment:
   I changed the test according to your remarks. Indeed, two tests are 
sufficient.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16754: Implemented release acquired records functionality to SharePartition [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16430:
URL: https://github.com/apache/kafka/pull/16430#discussion_r1651002843


##
checkstyle/suppressions.xml:
##
@@ -39,6 +39,7 @@
 
 
+

Review Comment:
   actually I had refactored and reduced the complexity in one of my earlier 
commits. I just forgot to remove this change in `suppressions.xml`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState [kafka]

2024-06-24 Thread via GitHub


clolov commented on PR #16257:
URL: https://github.com/apache/kafka/pull/16257#issuecomment-2186519073

   Heya @muralibasani, could you just fix
   ```
   [2024-06-21T10:19:19.373Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16257/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:2105:
  error: no suitable constructor found for 
RemoteLogSegmentMetadata(RemoteLogSegmentId,long,long,long,int,long,int,Optional,RemoteLogSegmentState,NavigableMap)
   ```
   from the build?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16855 : Part 1 - New fields tieredEpoch and tieredState [kafka]

2024-06-24 Thread via GitHub


clolov commented on code in PR #16257:
URL: https://github.com/apache/kafka/pull/16257#discussion_r1650984849


##
metadata/src/main/resources/common/metadata/TopicRecord.json:
##
@@ -17,12 +17,16 @@
   "apiKey": 2,
   "type": "metadata",
   "name": "TopicRecord",
-  "validVersions": "0",
+  "validVersions": "0-1",

Review Comment:
   I see, I have not fully caught up on KIP-1022, which appears to have changed 
the approach for unstable metadata version. Okay, let's continue with what is 
proposed in this PR. Thank you for checking!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16967: NioEchoServer fails to register connection and causes flaky failure. [kafka]

2024-06-24 Thread via GitHub


frankvicky commented on PR #16384:
URL: https://github.com/apache/kafka/pull/16384#issuecomment-2186497190

   Hi @chia7712,
   
   I have run the failing test on both the trunk and the 16967 branch on my 
local machine, but I couldn't reproduce the failure. I also reviewed the 
failing test, and it seems that my changes are not related to the failing test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650969451


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);

Review Comment:
   For that, we need to have a function in SharePartitionManager to set the 
`timer` data member to this mockTimer object. We can mark it as `// Only for 
testing.` Do we want to take that route?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16887: Update docs for add metrics values [kafka]

2024-06-24 Thread via GitHub


Nancy-ksolves opened a new pull request, #16438:
URL: https://github.com/apache/kafka/pull/16438

   Upgrade the document to add remote copy/fetch quotas metrics values.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650969451


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);

Review Comment:
   For that, we need to have a function in SharePartitionManager to set the 
timer object to this mockTimer object. We can mark it as `// Only for testing.` 
Do we want to take that route?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650969451


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);

Review Comment:
   For that, we need to have a function in SharePartitionManager to set the 
timer data member to this mockTimer object. We can mark it as `// Only for 
testing.` Do we want to take that route?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650961363


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   That is good and please add tests for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17011: SupportedFeatures.MinVersion incorrectly blocks v0 (3.8) [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on PR #16420:
URL: https://github.com/apache/kafka/pull/16420#issuecomment-2186476992

   BTW, there is a description in KIP-584
   
   > Each broker’s supported dictionary of {feature → version range} will be 
defined in the broker code. For each supported feature, the supported version 
range is defined by a min_version (an int64 starting always from 1) and 
max_version (an int64 >=1 and >= min_version).
   
   not sure why #15671 changed the start version from 1 to 0
   
   https://github.com/apache/kafka/pull/15671/files#r1576879728
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


sebastienviale commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650933888


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -63,6 +95,36 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
 assertThrows(StreamsException.class, () -> node.init(null));
 }
 
+@Test
+public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+final ProcessorNode node =
+new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
+node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());

Review Comment:
   I added one test to compare results in the ProcessingExceptionHandlerMock (I 
removed the Test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


sebastienviale commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650931604


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
 public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+final InternalProcessorContext 
internalProcessorContext = mockInternalProcessorContext();
 node.init(internalProcessorContext);
 
-assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+final StreamsException processingException = 
assertThrows(StreamsException.class,
+() -> node.process(new Record<>("key", "value", 0)));
+
+assertEquals("Processing exception handler is set to fail upon" +
+" a processing error. If you would rather have the streaming 
pipeline" +
+" continue after a processing error, please set the " +
+PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+assertTrue(processingException.getCause() instanceof RuntimeException);
+assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+processingException.getCause().getMessage());
 }
 
 @Test
 public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+final InternalProcessorContext 
internalProcessorContext = mockInternalProcessorContext();
 node.init(internalProcessorContext);
 
 assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
 }
 
 @Test
+@SuppressWarnings("unchecked")
 public void shouldNotHandleStreamsExceptionAsProcessingException() {
+final ProcessingExceptionHandler processingExceptionHandler = 
spy(ProcessingExceptionHandler.class);

Review Comment:
   I changed for mock(ProcessingExceptionHandler.class) to check if handle() 
method is called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650931188


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();

Review Comment:
   you're right, we don't need to mock it. I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


adixitconfluent commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650928682


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);
+assertEquals(1, mockList.size());
+}
+
+private TimerTask createTimerTask(List mockList) {
+return new TimerTask(100) {
+@Override

Review Comment:
   since, this functionality is required at 2 places, I thought to create it as 
a different function `createTimerTask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15138) Java kafka-clients compression dependencies should be optional

2024-06-24 Thread Romain Quinio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859653#comment-17859653
 ] 

Romain Quinio commented on KAFKA-15138:
---

Any news on this ?  Lz4-java is no longer maintained since 2021 
([https://github.com/lz4/lz4-java/issues/196|https://github.com/lz4/lz4-java/issues/196).]),
 so kafka-client is bringing transitive dependency lz4-java with security 
vulnerabilities.

Or is there a workaround of excluding via dependency management the 
dependencies to compression protocols that are not used ? Or would that cause 
classloading error into Kafka ?

 

> Java kafka-clients compression dependencies should be optional
> --
>
> Key: KAFKA-15138
> URL: https://issues.apache.org/jira/browse/KAFKA-15138
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Joe DiPol
>Priority: Major
>
> If you look at
> [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]
> You see that the dependencies for the compression libraries (like lz4-java) 
> do NOT have "{{{}true{}}}". That means that these 
> libraries are transitive dependencies which will be pulled (and potentially 
> security scanned) for any project that uses kafka-clients. 
> This is not correct. These compression libraries are optional and should not 
> be transitive dependencies of kafka-clients. Therefore the above pom should 
> state {{optional}} like:
> {{
> 
> org.lz4
> lz4-java
> 1.8.0
> runtime
> true
> 
> }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15138) Java kafka-clients compression dependencies should be optional

2024-06-24 Thread Romain Quinio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859653#comment-17859653
 ] 

Romain Quinio edited comment on KAFKA-15138 at 6/24/24 12:07 PM:
-

Any news on this ?  Lz4-java is no longer maintained since 2021 
([https://github.com/lz4/lz4-java/issues/196|https://github.com/lz4/lz4-java/issues/196).]),
 so kafka-client is bringing transitive dependency lz4-java with security 
vulnerabilities.

Or is there a workaround of excluding via dependency management the 
dependencies to compression protocols that are not used ? Or would that cause 
classloading error into Kafka ?


was (Author: rquinio):
Any news on this ?  Lz4-java is no longer maintained since 2021 
([https://github.com/lz4/lz4-java/issues/196|https://github.com/lz4/lz4-java/issues/196).]),
 so kafka-client is bringing transitive dependency lz4-java with security 
vulnerabilities.

Or is there a workaround of excluding via dependency management the 
dependencies to compression protocols that are not used ? Or would that cause 
classloading error into Kafka ?

 

> Java kafka-clients compression dependencies should be optional
> --
>
> Key: KAFKA-15138
> URL: https://issues.apache.org/jira/browse/KAFKA-15138
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.0
>Reporter: Joe DiPol
>Priority: Major
>
> If you look at
> [https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.0/kafka-clients-3.4.0.pom]
> You see that the dependencies for the compression libraries (like lz4-java) 
> do NOT have "{{{}true{}}}". That means that these 
> libraries are transitive dependencies which will be pulled (and potentially 
> security scanned) for any project that uses kafka-clients. 
> This is not correct. These compression libraries are optional and should not 
> be transitive dependencies of kafka-clients. Therefore the above pom should 
> state {{optional}} like:
> {{
> 
> org.lz4
> lz4-java
> 1.8.0
> runtime
> true
> 
> }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16754: Implemented release acquired records functionality to SharePartition [kafka]

2024-06-24 Thread via GitHub


apoorvmittal10 commented on code in PR #16430:
URL: https://github.com/apache/kafka/pull/16430#discussion_r1650900179


##
checkstyle/suppressions.xml:
##
@@ -39,6 +39,7 @@
 
 
+

Review Comment:
   Can we reduce the cyclomatic complexity? I have refactored acquire and 
acknowledge methods to sub methods to decrease the complexity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17003: Implemented SharePartitionManager close() functionality [kafka]

2024-06-24 Thread via GitHub


apoorvmittal10 commented on code in PR #16431:
URL: https://github.com/apache/kafka/pull/16431#discussion_r1650896425


##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);

Review Comment:
   I don't think it's a good idea to sleep. Can't we use mocktimer?



##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();
+
+List mockList = new ArrayList<>();
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Allowing first timer task to expire. The timer task will add an 
element to mockList.
+TestUtils.waitForCondition(
+() -> mockList.size() == 1,
+DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS,
+() -> "Timer task never got timed out.");
+
+sharePartitionManager.timer().add(createTimerTask(mockList));
+// Closing the sharePartitionManager closes timer object in 
sharePartitionManager.
+sharePartitionManager.close();
+// Allowing second timer task to expire. It won't be able to add an 
element to mockList.
+Thread.sleep(DEFAULT_MAX_WAIT_TIMER_TASK_TIMEOUT_MS);
+assertEquals(1, mockList.size());
+}
+
+private TimerTask createTimerTask(List mockList) {
+return new TimerTask(100) {
+@Override

Review Comment:
   nit: It can be a lambda method.



##
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##
@@ -1221,6 +1224,36 @@ public void testReplicaManagerFetchShouldProceed() {
 any(), any(), any(ReplicaQuota.class), any());
 }
 
+@Test
+public void testCloseSharePartitionManager() throws Exception {
+SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+.withShareGroupPersister(mock(Persister.class)).build();

Review Comment:
   Why do we require a persister mock when we are not using the mock instance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16666: Migrate `TransactionLogMessageFormatter` to tools module [kafka]

2024-06-24 Thread via GitHub


m1a2st commented on code in PR #16019:
URL: https://github.com/apache/kafka/pull/16019#discussion_r1650898140


##
tools/src/main/java/org/apache/kafka/tools/consumer/TransactionLogMessageFormatter.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
+import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
+import 
org.apache.kafka.coordinator.transaction.generated.TransactionLogValueJsonConverter;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class TransactionLogMessageFormatter implements MessageFormatter {
+
+@Override
+public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+Optional.ofNullable(consumerRecord.key())
+.map(key -> readToTransactionLogKey(ByteBuffer.wrap(key)))
+.ifPresent(transactionLogKey -> {
+short version = 
ByteBuffer.wrap(consumerRecord.key()).getShort();
+ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+json.set("version", new TextNode(Short.toString(version)));
+
+if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION
+&& version <= 
TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
+byte[] value = consumerRecord.value();
+TransactionLogValue transactionLogValue = 
+new TransactionLogValue(new 
ByteBufferAccessor(ByteBuffer.wrap(value)), version);
+JsonNode jsonNode = 
TransactionLogValueJsonConverter.write(transactionLogValue, version);
+json.set("transactionalId", new 
TextNode(transactionLogKey.transactionalId()));
+json.set("data", jsonNode);
+} else {
+json.set("data", new TextNode("unknown"));
+}
+try {
+output.write(json.toString().getBytes(UTF_8));
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+});
+}
+
+private TransactionLogKey readToTransactionLogKey(ByteBuffer byteBuffer) {
+short version = byteBuffer.getShort();
+if (version >= TransactionLogKey.LOWEST_SUPPORTED_VERSION
+&& version <= TransactionLogKey.HIGHEST_SUPPORTED_VERSION) {
+return new TransactionLogKey(new ByteBufferAccessor(byteBuffer), 
version);
+} else {
+return new TransactionLogKey();

Review Comment:
   I test the formatter, and the result is 
   ```json
   {
 "key": {
   "version": "0",
   "data": {
 "transactionalId": "TXNID"
   }
 },
 "value": {
   "version": "1",
   "data": {
 "producerId": 100,
 "producerEpoch": 50,
 "transactionTimeoutMs": 500,
 "transactionStatus": 4,
 "transactionPartitions": [],
 "transactionLastUpdateTimestampMs": 1000,
 "transactionStartTimestampMs": 750
   }
 }
   }
   ```
   ```json
   {
 "key": {
   "version": "10",
   "data": "unknown"
 },
 "value": {
   "version": "10",
   "data": "unknown"
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this 

[jira] [Resolved] (KAFKA-16749) Implement share fetch messages

2024-06-24 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-16749.
---
Resolution: Done

> Implement share fetch messages
> --
>
> Key: KAFKA-16749
> URL: https://issues.apache.org/jira/browse/KAFKA-16749
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16999) Integrate persister read API in Partition leader initilization

2024-06-24 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-16999.
---
Resolution: Done

> Integrate persister read API in Partition leader initilization
> --
>
> Key: KAFKA-16999
> URL: https://issues.apache.org/jira/browse/KAFKA-16999
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16748) Implement share response handling in SharePartitionManager

2024-06-24 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-16748.
---
Resolution: Fixed

> Implement share response handling in SharePartitionManager
> --
>
> Key: KAFKA-16748
> URL: https://issues.apache.org/jira/browse/KAFKA-16748
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Abhinav Dixit
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17026) Implement updateCacheAndOffsets functionality on LSO movement

2024-06-24 Thread Abhinav Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Dixit updated KAFKA-17026:
--
Summary: Implement updateCacheAndOffsets functionality on LSO movement  
(was: Implement updateCachedAndOffsets functionality on LSO movement)

> Implement updateCacheAndOffsets functionality on LSO movement
> -
>
> Key: KAFKA-17026
> URL: https://issues.apache.org/jira/browse/KAFKA-17026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Abhinav Dixit
>Assignee: Abhinav Dixit
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: pass in timeout to Admin.close() [kafka]

2024-06-24 Thread via GitHub


apoorvmittal10 commented on code in PR #16422:
URL: https://github.com/apache/kafka/pull/16422#discussion_r1650869359


##
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##
@@ -1513,7 +1513,7 @@ private Thread shutdownHelper(final boolean error, final 
long timeoutMs, final b
 }
 
 stateDirectory.close();
-adminClient.close();
+adminClient.close(Duration.ofMillis(timeoutMs));

Review Comment:
   Query: Shouldn't we account the time already spent in closing processes i.e. 
timeoutMs passed to different applications might not be just constant rather 
decreasing? I have seen decreasing time approach in KafkaConsumer: 
https://github.com/apache/kafka/blob/3a0c83e49030074271dd34c20fa99baad6270c93/clients/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer.java#L1141
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650830032


##
streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Category(IntegrationTest.class)
+public class ProcessingExceptionHandlerIntegrationTest {
+private final String threadId = Thread.currentThread().getName();
+
+@Test
+public void shouldContinueWhenProcessingExceptionOccursAtBeginning() {

Review Comment:
   My proposal would be the following:
   FAIL:
   - a stream of (`good1`, `bad1`, `good2`, `good3`): with this you can verify 
that the processor processed `good1`, failed with `bad1`, and did not process 
`good2` and `good3`. Is there something else you need to test? 
   
   CONTINUE:
   - a stream of ((`good1`, `bad1`, `good2`, `good3`, `bad2`, `good4`)): with 
this you can verify that the good ones are all processed. Is there something 
else you need to test? 
   
   I might have missed some cases. So please thoroughly verify my proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650830032


##
streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java:
##
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Category(IntegrationTest.class)
+public class ProcessingExceptionHandlerIntegrationTest {
+private final String threadId = Thread.currentThread().getName();
+
+@Test
+public void shouldContinueWhenProcessingExceptionOccursAtBeginning() {

Review Comment:
   My proposal would be the following:
   FAIL:
   - a stream of (`good1`, `bad1`, `good2`, `good3`): with this you can verify 
that the processor processed `good1`, failed with `bad1`, and did not process 
`good2` and `good3`. Is there something else you need to test? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16788 - Fix resource leakage during connector start() failure [kafka]

2024-06-24 Thread via GitHub


ashoke-cube commented on PR #16095:
URL: https://github.com/apache/kafka/pull/16095#issuecomment-2186292726

   > > Hi @ashoke-cube could you fix the build? Thanks!
   > 
   > Hey @gharris1727 I looked into the build failure. It is a bit weird. It is 
failing because it is not able to find the junit's `Test` class. Local test 
runs fine. It's not clear to me what the issue is here. Is there anything else 
to be done after adding new test cases?
   > 
   > ```
   > Task :connect:runtime:compileTestJava
   > 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_[PR-16095](https://confluentinc.atlassian.net/browse/PR-16095)/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:140:
 error: cannot find symbol
   > 
   > @Test
   > ^
   > symbol:   class Test
   > location: class WorkerConnectorTest
   > 
   > 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_[PR-16095](https://confluentinc.atlassian.net/browse/PR-16095)/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java:393:
 error: cannot find symbol
   > 
   > @Test
   > ^
   > symbol:   class Test
   > location: class WorkerConnectorTest
   > ```
   
   Figured it. Fixing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650786044


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
 public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+final InternalProcessorContext 
internalProcessorContext = mockInternalProcessorContext();
 node.init(internalProcessorContext);
 
-assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+final StreamsException processingException = 
assertThrows(StreamsException.class,
+() -> node.process(new Record<>("key", "value", 0)));
+
+assertEquals("Processing exception handler is set to fail upon" +
+" a processing error. If you would rather have the streaming 
pipeline" +
+" continue after a processing error, please set the " +
+PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+assertTrue(processingException.getCause() instanceof RuntimeException);
+assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+processingException.getCause().getMessage());
 }
 
 @Test
 public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());
+node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+final InternalProcessorContext 
internalProcessorContext = mockInternalProcessorContext();
 node.init(internalProcessorContext);
 
 assertDoesNotThrow(() -> node.process(new Record<>("key", "value", 
0)));
 }
 
 @Test
+@SuppressWarnings("unchecked")
 public void shouldNotHandleStreamsExceptionAsProcessingException() {
+final ProcessingExceptionHandler processingExceptionHandler = 
spy(ProcessingExceptionHandler.class);

Review Comment:
   Please do not use spies! They are bad practice, because they do not really 
decouple the code to test from other code. Here a 
`mock(ProcessingExceptionHandler.class)` should be fine since 
`ProcessingExceptionHandler` is an interface.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -99,30 +81,57 @@ public void 
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
 public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndFailProcessingExceptionHandler());
+node.setProcessingExceptionHandler(new 
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+final InternalProcessorContext 
internalProcessorContext = mockInternalProcessorContext();
 node.init(internalProcessorContext);
 
-assertThrows(StreamsException.class, () -> node.process(new 
Record<>("key", "value", 0)));
+final StreamsException processingException = 
assertThrows(StreamsException.class,
+() -> node.process(new Record<>("key", "value", 0)));
+
+assertEquals("Processing exception handler is set to fail upon" +
+" a processing error. If you would rather have the streaming 
pipeline" +
+" continue after a processing error, please set the " +
+PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.", 
processingException.getMessage());
+
+assertTrue(processingException.getCause() instanceof RuntimeException);
+assertEquals("Processing exception should be caught and handled by the 
processing exception handler.",
+processingException.getCause().getMessage());
 }
 
 @Test
 public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue() 
{
 final ProcessorNode node =
 new ProcessorNode<>("name", new ProcessingExceptionProcessor(), 
Collections.emptySet());
-node.setProcessingExceptionHandler(new 
LogAndContinueProcessingExceptionHandler());

[jira] [Commented] (KAFKA-16334) Remove Deprecated command line option from reset tool

2024-06-24 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859641#comment-17859641
 ] 

Caio César commented on KAFKA-16334:


I would like to work on this task. :)

> Remove Deprecated command line option from reset tool
> -
>
> Key: KAFKA-16334
> URL: https://issues.apache.org/jira/browse/KAFKA-16334
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Caio César
>Priority: Blocker
> Fix For: 4.0.0
>
>
> --bootstrap-server (singular) was deprecated in 3.4 release via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16334) Remove Deprecated command line option from reset tool

2024-06-24 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caio César reassigned KAFKA-16334:
--

Assignee: Caio César

> Remove Deprecated command line option from reset tool
> -
>
> Key: KAFKA-16334
> URL: https://issues.apache.org/jira/browse/KAFKA-16334
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Caio César
>Priority: Blocker
> Fix For: 4.0.0
>
>
> --bootstrap-server (singular) was deprecated in 3.4 release via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16448: Catch and handle processing exceptions [kafka]

2024-06-24 Thread via GitHub


cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650767327


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java:
##
@@ -49,6 +51,21 @@ public Headers headers() {
 return value.headers();
 }
 
+public ConsumerRecord rawRecord() {
+return rawRecord;
+}
+
+@Override
+public boolean equals(final Object other) {
+// Do not include rawRecord in the comparison
+return super.equals(other);
+}
+
+@Override
+public int hashCode() {
+return super.hashCode();
+}
+

Review Comment:
   Interesting! Did not know about this! Let make spotbugs happy then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17011: SupportedFeatures.MinVersion incorrectly blocks v0 (3.8) [kafka]

2024-06-24 Thread via GitHub


chia7712 commented on code in PR #16420:
URL: https://github.com/apache/kafka/pull/16420#discussion_r1650736148


##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -258,11 +259,22 @@ private static ApiVersionsResponseData 
createApiVersionsResponseData(
 final long finalizedFeaturesEpoch,
 final boolean zkMigrationEnabled
 ) {
+Features backwardsCompatibleFeatures = 
Features.supportedFeatures(latestSupportedFeatures.features().entrySet()
+.stream().filter(entry -> {
+SupportedVersionRange supportedVersionRange = entry.getValue();
+return supportedVersionRange.min() != 0;

Review Comment:
   not sure whether I fail to catch your description.
   
   > so for now we will change 0 to 1 in the response in order to be backwards 
compatible.
   
the code looks like it gets rid of "0" instead of changing to from 0 to 1.



##
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##
@@ -258,11 +259,22 @@ private static ApiVersionsResponseData 
createApiVersionsResponseData(
 final long finalizedFeaturesEpoch,
 final boolean zkMigrationEnabled
 ) {
+Features backwardsCompatibleFeatures = 
Features.supportedFeatures(latestSupportedFeatures.features().entrySet()
+.stream().filter(entry -> {
+SupportedVersionRange supportedVersionRange = entry.getValue();
+return supportedVersionRange.min() != 0;

Review Comment:
   For another, does this PR mean `Admin#describeFeatures` can't see the 
feature "group.version" from the broker running in 3.8.0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17026) Implement updateCachedAndOffsets functionality on LSO movement

2024-06-24 Thread Abhinav Dixit (Jira)
Abhinav Dixit created KAFKA-17026:
-

 Summary: Implement updateCachedAndOffsets functionality on LSO 
movement
 Key: KAFKA-17026
 URL: https://issues.apache.org/jira/browse/KAFKA-17026
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhinav Dixit
Assignee: Abhinav Dixit






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16965: Throw cause of TimeoutException [kafka]

2024-06-24 Thread via GitHub


aliehsaeedii commented on code in PR #16344:
URL: https://github.com/apache/kafka/pull/16344#discussion_r1650696721


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1175,19 +1176,26 @@ private ClusterAndWaitTime waitOnMetadata(String topic, 
Integer partition, long
 try {
 metadata.awaitUpdate(version, remainingWaitMs);
 } catch (TimeoutException ex) {
+final String errorMessage = String.format("Topic %s not 
present in metadata after %d ms.",
+topic, maxWaitMs);
+if (metadata.getError(topic) != null) {
+throw new TimeoutException(errorMessage, 
metadata.getError(topic).exception());
+}
 // Rethrow with original maxWaitMs to prevent logging 
exception with remainingWaitMs

Review Comment:
   > Also just to be clear -- this code path is not one where we see anyting 
but retriable exceptions? I noticed the other path checks that but not this one.
   
   Please see https://github.com/apache/kafka/pull/16344#discussion_r1648061530
   I personally think that fatal exceptions are thrown 
[here](https://github.com/apache/kafka/blob/3a0c83e49030074271dd34c20fa99baad6270c93/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java#L123).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16551) add integration test for ClusterTool

2024-06-24 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859634#comment-17859634
 ] 

PoAn Yang commented on KAFKA-16551:
---

Yes, let me handle it. Thank you.

> add integration test for ClusterTool
> 
>
> Key: KAFKA-16551
> URL: https://issues.apache.org/jira/browse/KAFKA-16551
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.9.0
>
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >