[jira] [Updated] (KAFKA-17097) Add replace.null.with.default configuration to ValueToKey and ReplaceField (KIP-1040)

2024-07-08 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17097:

Description: 
{color:#172b4d}See 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677] 
for motivation and design.

{color}These are the final remaining transformations which still need this 
configuration added.

> Add replace.null.with.default configuration to ValueToKey and ReplaceField 
> (KIP-1040)
> -
>
> Key: KAFKA-17097
> URL: https://issues.apache.org/jira/browse/KAFKA-17097
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Priority: Major
>
> {color:#172b4d}See 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677] 
> for motivation and design.
> {color}These are the final remaining transformations which still need this 
> configuration added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17097) Add replace.null.with.default configuration to ValueToKey and ReplaceField (KIP-1040)

2024-07-08 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17097:

Labels: newbie  (was: )

> Add replace.null.with.default configuration to ValueToKey and ReplaceField 
> (KIP-1040)
> -
>
> Key: KAFKA-17097
> URL: https://issues.apache.org/jira/browse/KAFKA-17097
> Project: Kafka
>  Issue Type: Task
>  Components: connect
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> {color:#172b4d}See 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=303794677] 
> for motivation and design.
> {color}These are the final remaining transformations which still need this 
> configuration added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17097) Add replace.null.with.default configuration to ValueToKey and ReplaceField (KIP-1040)

2024-07-08 Thread Greg Harris (Jira)
Greg Harris created KAFKA-17097:
---

 Summary: Add replace.null.with.default configuration to ValueToKey 
and ReplaceField (KIP-1040)
 Key: KAFKA-17097
 URL: https://issues.apache.org/jira/browse/KAFKA-17097
 Project: Kafka
  Issue Type: Task
  Components: connect
Reporter: Greg Harris






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15838) [Connect] ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields

2024-07-08 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15838.
-
Fix Version/s: 3.9.0
   Resolution: Fixed

The InsertField, ExtractField, HeaderFrom, Cast, SetSchemaMetadata, 
TimestampConverter, MaskField now have replace.null.with.default configurations.

> [Connect] ExtractField and InsertField NULL Values are replaced by default 
> value even in NULLABLE fields
> 
>
> Key: KAFKA-15838
> URL: https://issues.apache.org/jira/browse/KAFKA-15838
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Eric Pangiawan
>Assignee: Mario Fiore Vitale
>Priority: Major
> Fix For: 3.9.0
>
>
> ExtractField: Line 116-119
> [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java#L61-L68]
> InsertField: Line 163 - 195
> [https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L163-L195]
> h1. Expect:
> Value `null` is valid for an optional filed, even though the field has a 
> default value.
> Only when field is required, the class return default value fallback when 
> value is `null`.
> h1. Actual:
> Always return default value if `null` was given.
> h1. Example:
> PostgreSQL DDL:
> {code:java}
> CREATE TABLE products(
>     id varchar(255),
>     color varchar(255),
>     quantity float8
> );
> -- Set Default
> ALTER TABLE products ALTER COLUMN quantity SET  DEFAULT 1.0; {code}
> Insert A Record:
> {code:java}
> INSERT INTO public.products VALUES('1', 'Blue', null); {code}
> Table Select *:
> {code:java}
>  id | color | quantity
> +---+--
>  1  | Blue  | {code}
> Debezium Behavior when using ExtractField and InsertField class (in the event 
> flattening SMT):
> {code:java}
> {
>     "id":"1",
>     "color":"Blue",
>     "quantity":1.0,
>     "__op":"c",
>     "__ts_ms":1698127432079,
>     "__source_ts_ms":1698127431679,
>     "__db":"testing_db",
>     "__schema":"public",
>     "__table":"products",
>     "__lsn":24470112,
>     "__txId":957,
>     "__snapshot":null,
>     "__deleted":"false"
> } {code}
> The debezium code can be found 
> [here|https://github.com/debezium/debezium/blob/2.4/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java#L116-L119]
> h1. Expected Output:
> {code:java}
> {
>     "id":"1",
>     "color":"Blue",
>     "quantity":null,
>     "__op":"c",
>     "__ts_ms":1698127432079,
>     "__source_ts_ms":1698127431679,
>     "__db":"testing_db",
>     "__schema":"public",
>     "__table":"products",
>     "__lsn":24470112,
>     "__txId":957,
>     "__snapshot":null,
>     "__deleted":"false"
> }{code}
> h1. Temporary Solution:
> use getWithoutDefault() into ExtractField and InsertField instead of get()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17095) Fix the typo: CreateableTopicConfig -> CreatableTopicConfig

2024-07-08 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863890#comment-17863890
 ] 

Greg Harris commented on KAFKA-17095:
-

Is this a backwards compatible change? If someone with the old definition 
connects to a broker with the new definition, do the two interoperate?
If someone is relying on the generated type directly, does this break 
compilation?

> Fix the typo: CreateableTopicConfig -> CreatableTopicConfig
> ---
>
> Key: KAFKA-17095
> URL: https://issues.apache.org/jira/browse/KAFKA-17095
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Ming-Yen Chung
>Priority: Minor
>
> source: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsRequest.json#L51



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable

2024-07-08 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-14648:
--

Assignee: Brenden DeLuna

> Do not fail clients if bootstrap servers is not immediately resolvable
> --
>
> Key: KAFKA-14648
> URL: https://issues.apache.org/jira/browse/KAFKA-14648
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Brenden DeLuna
>Priority: Major
>
> In dynamic environments, such as system tests, there is sometimes a delay 
> between when a client is initialized and when the configured bootstrap 
> servers become available in DNS. Currently clients will fail immediately if 
> none of the bootstrap servers can resolve. It would be more convenient for 
> these environments to provide a grace period to give more time for 
> initialization. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable

2024-07-08 Thread Brenden DeLuna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brenden DeLuna reassigned KAFKA-14648:
--

Assignee: (was: Brenden DeLuna)

> Do not fail clients if bootstrap servers is not immediately resolvable
> --
>
> Key: KAFKA-14648
> URL: https://issues.apache.org/jira/browse/KAFKA-14648
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> In dynamic environments, such as system tests, there is sometimes a delay 
> between when a client is initialized and when the configured bootstrap 
> servers become available in DNS. Currently clients will fail immediately if 
> none of the bootstrap servers can resolve. It would be more convenient for 
> these environments to provide a grace period to give more time for 
> initialization. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable

2024-07-08 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-14648:
--

Assignee: Brenden DeLuna  (was: Philip Nee)

> Do not fail clients if bootstrap servers is not immediately resolvable
> --
>
> Key: KAFKA-14648
> URL: https://issues.apache.org/jira/browse/KAFKA-14648
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Brenden DeLuna
>Priority: Major
>
> In dynamic environments, such as system tests, there is sometimes a delay 
> between when a client is initialized and when the configured bootstrap 
> servers become available in DNS. Currently clients will fail immediately if 
> none of the bootstrap servers can resolve. It would be more convenient for 
> these environments to provide a grace period to give more time for 
> initialization. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17096) Fix kafka_log4j_appender.py

2024-07-08 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17096:
--

 Summary: Fix kafka_log4j_appender.py
 Key: KAFKA-17096
 URL: https://issues.apache.org/jira/browse/KAFKA-17096
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see [https://github.com/apache/kafka/pull/12148#issuecomment-2214584773] for 
root cause



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17091) Add @FunctionalInterface to Streams interfaces

2024-07-08 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863835#comment-17863835
 ] 

Bill Bejeck commented on KAFKA-17091:
-

Great! Thanks for the quick response.

> Add @FunctionalInterface to Streams interfaces
> --
>
> Key: KAFKA-17091
> URL: https://issues.apache.org/jira/browse/KAFKA-17091
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ray McDermott
>Assignee: Ray McDermott
>Priority: Minor
>  Labels: need-kip
>
> Clojure version 1.12 (currently in beta) has many updates to Java interop.
> Unfortunately, it does not quite deliver what we need with respect to 
> thinning down Kafka Streams interop.
> We were specifically hoping that passing {{(fn [] ...)}} to SAM interfaces 
> would just work and we would not need to {{reify}} the interface.
> Sadly it only works for interfaces that have been explicitly annotated with 
> {{@FunctionalInterface}}  - and the Kafka Streams DSL does not have those 
> annotations.
> Details here
> https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17088) REQUEST_TIMED_OUT occurs intermittently in the kafka Producer client

2024-07-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17088:
--
Component/s: clients
 producer 
 (was: core)

> REQUEST_TIMED_OUT occurs intermittently in the kafka Producer client 
> -
>
> Key: KAFKA-17088
> URL: https://issues.apache.org/jira/browse/KAFKA-17088
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> Hi 
> We observe that producer receives a request timeout ( intermittently) when 
> trying to send message to kafka broker. Below is the properties set for kafka 
> producer. 
> producerProps.put("acks", "all");
> producerProps.put("linger.ms", 0);
> producerProps.put("max.block.ms", 5000);
> producerProps.put("metadata.max.idle.ms", 5000); 
> producerProps.put("delivery.timeout.ms", 1); 
> producerProps.put("request.timeout.ms", 1000);
> producerProps.put("key.serializer", BYTE_SERIALIZER);
> producerProps.put("value.serializer", BYTE_SERIALIZER);
>  
>  
> we receive below message intermittently. We need to know the reaon for this 
> timeout.
> _[kafka-producer-network-thread | producer-1] 
> o.a.k.c.u.LogContext$LocationAwareKafkaLogger:434 writeLog [Producer 
> clientId=producer-1] Got error produce response with correlation id 231972 on 
> topic-partition {*}health_check_topic_msg2-0{*}, retrying (2147483646 
> attempts left). Error: REQUEST_TIMED_OUT. Error Message: Disconnected from 
> node 1 due to timeout_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17093:
--
Component/s: clients

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer

2024-07-08 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17092:
--
Component/s: clients
 consumer
 unit tests

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer
> -
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17091) Add @FunctionalInterface to Streams interfaces

2024-07-08 Thread Ray McDermott (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863811#comment-17863811
 ] 

Ray McDermott commented on KAFKA-17091:
---

[~bbejeck] I have added a comment making the request

> Add @FunctionalInterface to Streams interfaces
> --
>
> Key: KAFKA-17091
> URL: https://issues.apache.org/jira/browse/KAFKA-17091
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ray McDermott
>Assignee: Ray McDermott
>Priority: Minor
>  Labels: need-kip
>
> Clojure version 1.12 (currently in beta) has many updates to Java interop.
> Unfortunately, it does not quite deliver what we need with respect to 
> thinning down Kafka Streams interop.
> We were specifically hoping that passing {{(fn [] ...)}} to SAM interfaces 
> would just work and we would not need to {{reify}} the interface.
> Sadly it only works for interfaces that have been explicitly annotated with 
> {{@FunctionalInterface}}  - and the Kafka Streams DSL does not have those 
> annotations.
> Details here
> https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17091) Add @FunctionalInterface to Streams interfaces

2024-07-08 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863799#comment-17863799
 ] 

Bill Bejeck commented on KAFKA-17091:
-

Hi [~genraiy]  thanks for the contribution idea.  Since this is a change to the 
public interface code of Kafka Streams, this PR will require a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals].
 Ping me here once you create a wiki ID (step 2 from the getting started 
section in the page referenced above) and I'll go in and give you edit 
permissions to create a KIP.

> Add @FunctionalInterface to Streams interfaces
> --
>
> Key: KAFKA-17091
> URL: https://issues.apache.org/jira/browse/KAFKA-17091
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ray McDermott
>Assignee: Ray McDermott
>Priority: Minor
>  Labels: need-kip
>
> Clojure version 1.12 (currently in beta) has many updates to Java interop.
> Unfortunately, it does not quite deliver what we need with respect to 
> thinning down Kafka Streams interop.
> We were specifically hoping that passing {{(fn [] ...)}} to SAM interfaces 
> would just work and we would not need to {{reify}} the interface.
> Sadly it only works for interfaces that have been explicitly annotated with 
> {{@FunctionalInterface}}  - and the Kafka Streams DSL does not have those 
> annotations.
> Details here
> https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17066) New consumer updateFetchPositions should perform all operations in background thread

2024-07-08 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-17066:
---
Labels: consumer-threading-refactor kip-848-client-support  (was: )

> New consumer updateFetchPositions should perform all operations in background 
> thread
> 
>
> Key: KAFKA-17066
> URL: https://issues.apache.org/jira/browse/KAFKA-17066
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>
> The updateFetchPositions func in the new consumer performs several actions 
> based on the assigned partitions from the subscriptionState. The way it's 
> currently implemented, it fetches committed offsets for partitions that 
> required a position (retrieved from subscription state in the app thread), 
> and then resets positions for the partitions still needing one (retrieved 
> from the subscription state but in the backgroud thread). 
> This is problematic, given that the assignment/subscriptionState may change 
> in the background thread at any time (ex. new partitions reconciled), so we 
> could end up resetting positions to the partition offsets for a partition for 
> which we never evetn attempted to retrieve committed offsets.  
> This sequence for a consumer that owns a partitions tp0,:
>  * consumer owns tp0
>  * app thread -> updateFetchPositions triggers 
> initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned 
> partitions requiring a position (taking them from 
> subscriptions.initializingPartitions()). This will fetch committed offsets 
> for tp0 only.
>  * background thread -> receives new partition tp1 and completes 
> reconciliation (adds it to the subscription state as INITIALIZING, requires a 
> position)
>  * app thread -> updateFetchPositions resets positions for all partitions 
> that still don't have a valid position after initWithCommittedOffsetsIfNeeded 
> (taking them from subscriptionState.partitionsNeedingReset). This will 
> mistakenly consider that it should reset tp1 to the partition offsets, when 
> in reality it never even tried fetching the committed offsets for it because 
> it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 
> We should consider moving the updateFetchPositions as a single event to the 
> background, that would safely use the subscriptionState object and apply all 
> actions involved in the updateFetchPositions to the same consistent set of 
> partitions assigned at that moment. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17091) Add @FunctionalInterface to Streams interfaces

2024-07-08 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-17091:

Labels: need-kip  (was: )

> Add @FunctionalInterface to Streams interfaces
> --
>
> Key: KAFKA-17091
> URL: https://issues.apache.org/jira/browse/KAFKA-17091
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ray McDermott
>Assignee: Ray McDermott
>Priority: Minor
>  Labels: need-kip
>
> Clojure version 1.12 (currently in beta) has many updates to Java interop.
> Unfortunately, it does not quite deliver what we need with respect to 
> thinning down Kafka Streams interop.
> We were specifically hoping that passing {{(fn [] ...)}} to SAM interfaces 
> would just work and we would not need to {{reify}} the interface.
> Sadly it only works for interfaces that have been explicitly annotated with 
> {{@FunctionalInterface}}  - and the Kafka Streams DSL does not have those 
> annotations.
> Details here
> https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17095) Fix the typo: CreateableTopicConfig -> CreatableTopicConfig

2024-07-08 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17095:
--

 Summary: Fix the typo: CreateableTopicConfig -> 
CreatableTopicConfig
 Key: KAFKA-17095
 URL: https://issues.apache.org/jira/browse/KAFKA-17095
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Ming-Yen Chung


source: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsRequest.json#L51



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16057) Admin Client connections.max.idle.ms should be 9 minutes by default

2024-07-08 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863765#comment-17863765
 ] 

Ksolves commented on KAFKA-16057:
-

Done the changes to default 9 minutes for admin client as well. PR - 
https://github.com/apache/kafka/pull/16548

> Admin Client connections.max.idle.ms should be 9 minutes by default
> ---
>
> Key: KAFKA-16057
> URL: https://issues.apache.org/jira/browse/KAFKA-16057
> Project: Kafka
>  Issue Type: Bug
>Reporter: Marcin Kuthan
>Assignee: Ksolves
>Priority: Minor
>
> Producer and consumer define connections.max.idle.ms to 9 minutes but Admin 
> uses 5 minutes by default.
> When the connection.max.idle.ms is equal to metadata.max.age.ms (5 minutes) 
> admin client disconnects frequently. I observe the following log in Kafka 
> Connect cluster:
> {code:java}
> [AdminClient clientId=MyClientName--shared-admin] Node XYVZ disconnected. 
> {code}
> AdminClient is trying to fetch metadata on every 5 minutes but the connection 
> has been already closed due to the connection.max.idle.ms.
> As a workaround I defined connection.max.idle.ms property explicitly to 9 
> minutes in Kafka Connect configuration. This way admin, producers and 
> consumers use the same configuration.
> I'm wondering why Admin uses different default for connection.max.idle.ms 
> than Consumer / Producer. Bug or feature?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16057) Admin Client connections.max.idle.ms should be 9 minutes by default

2024-07-08 Thread Ksolves (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ksolves reassigned KAFKA-16057:
---

Assignee: Ksolves

> Admin Client connections.max.idle.ms should be 9 minutes by default
> ---
>
> Key: KAFKA-16057
> URL: https://issues.apache.org/jira/browse/KAFKA-16057
> Project: Kafka
>  Issue Type: Bug
>Reporter: Marcin Kuthan
>Assignee: Ksolves
>Priority: Minor
>
> Producer and consumer define connections.max.idle.ms to 9 minutes but Admin 
> uses 5 minutes by default.
> When the connection.max.idle.ms is equal to metadata.max.age.ms (5 minutes) 
> admin client disconnects frequently. I observe the following log in Kafka 
> Connect cluster:
> {code:java}
> [AdminClient clientId=MyClientName--shared-admin] Node XYVZ disconnected. 
> {code}
> AdminClient is trying to fetch metadata on every 5 minutes but the connection 
> has been already closed due to the connection.max.idle.ms.
> As a workaround I defined connection.max.idle.ms property explicitly to 9 
> minutes in Kafka Connect configuration. This way admin, producers and 
> consumers use the same configuration.
> I'm wondering why Admin uses different default for connection.max.idle.ms 
> than Consumer / Producer. Bug or feature?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-3737) Closing connection during produce request should be log with WARN level.

2024-07-08 Thread Ksolves (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ksolves reassigned KAFKA-3737:
--

Assignee: Ksolves

> Closing connection during produce request should be log with WARN level.
> 
>
> Key: KAFKA-3737
> URL: https://issues.apache.org/jira/browse/KAFKA-3737
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Florian Hussonnois
>Assignee: Ksolves
>Priority: Trivial
>
> Currently if an an error occurred during a produce request the exeception is 
> log as info.
> INFO [KafkaApi-0] Closing connection due to error during produce request with 
> correlation id 24 from client id console-producer with ack=0
> Topic and partition to exceptions: [test,0] -> 
> kafka.common.MessageSizeTooLargeException (kafka.server.KafkaApis)
> It could be more conveniant to use a WARN level to ease the tracing of this 
> errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10872) Log broker configuration prefixed with "listener.name.*"

2024-07-08 Thread Ksolves (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863727#comment-17863727
 ] 

Ksolves commented on KAFKA-10872:
-

[~badai] All the required configurations prefixed with "listener.name.*" are 
already displaying in the logs. I have attached the screenshot for reference.

 It's an old ticket so maybe it has been added later on. Let me know if that is 
still needed to implement.

> Log broker configuration prefixed with "listener.name.*"
> 
>
> Key: KAFKA-10872
> URL: https://issues.apache.org/jira/browse/KAFKA-10872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Ksolves
>Priority: Minor
> Attachments: listener-property.png
>
>
> When configuring broker listeners with "listener.name.*" prefix, it is very 
> hard to verify in the log if we are passing the correct value or if we're 
> missing any values.
> Can we log these configuration at INFO level? For example:
> {code:java}
> Configuration for listener.name.internal:
> listener.name.internal.ssl.truststore.location=/etc/ssl/truststore.jks
> listener.name.internal.ssl.truststore.password=changeit
> listener.name.internal.ssl.keystore.location=/etc/ssl/keystore.jks
> listener.name.internal.ssl.keystore.password=changeit
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10872) Log broker configuration prefixed with "listener.name.*"

2024-07-08 Thread Ksolves (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ksolves updated KAFKA-10872:

Attachment: listener-property.png

> Log broker configuration prefixed with "listener.name.*"
> 
>
> Key: KAFKA-10872
> URL: https://issues.apache.org/jira/browse/KAFKA-10872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Assignee: Ksolves
>Priority: Minor
> Attachments: listener-property.png
>
>
> When configuring broker listeners with "listener.name.*" prefix, it is very 
> hard to verify in the log if we are passing the correct value or if we're 
> missing any values.
> Can we log these configuration at INFO level? For example:
> {code:java}
> Configuration for listener.name.internal:
> listener.name.internal.ssl.truststore.location=/etc/ssl/truststore.jks
> listener.name.internal.ssl.truststore.password=changeit
> listener.name.internal.ssl.keystore.location=/etc/ssl/keystore.jks
> listener.name.internal.ssl.keystore.password=changeit
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-08 Thread Tom Kalmijn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863713#comment-17863713
 ] 

Tom Kalmijn commented on KAFKA-17093:
-

According to the documentation "If isolation. level=read_committed, the end 
offset will be the Last Stable Offset, i.e., the offset of the first message 
with an open transaction."

In my case I dont think there are any open transactions as no new messages are 
coming in. However, there are trailing transaction markers from previous 
(possibly aborted) transactional messages. So the documentation may not be 
clear about which offset should be returned in this scenario?

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17083) KRaft Upgrade Failures in SystemTests

2024-07-08 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-17083:
---
Fix Version/s: 3.8.0

> KRaft Upgrade Failures in SystemTests
> -
>
> Key: KAFKA-17083
> URL: https://issues.apache.org/jira/browse/KAFKA-17083
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Assignee: Igor Soarez
>Priority: Critical
> Fix For: 3.8.0
>
>
> 2 System tests for "TestKRaftUpgrade are consistently failing on 3.8 in the 
> system tests.
> {noformat}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_isolated_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "ISOLATED_KRAFT"
> }
> {noformat}
>  
> and 
>  
> {code:java}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_combined_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "COMBINED_KRAFT"
> }
> {code}
>  
> Failure for Isolated is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker15', 'hostname': 
> '10.140.39.207', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker15', 'ssh_hostname': '10.140.39.207', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.39.207', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=dev.metadata_quorum=ISOLATED_KRAFT-674
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f07e2e800a0>, '_sftp_client':  0x7f07e2f3d070>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
> worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7', 1, 
> b'SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding 
> in 
> [jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  Found binding in 
> [jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.\nSLF4J: Actual binding is of type 
> [org.slf4j.impl.Reload4jLoggerFactory]\n1 out of 1 operation(s) failed.\n')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 121, in test_isolated_mode_upgrade
> self.run_upgrade(from_kafka_version)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in run_upgrade
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 105, in run_produce_consume_validate
> core_test_action(*args)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in 
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 75, in perform_version_change
> self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/services/kafka/kafka.py",
>  line 920, in upgrade_metadata_version
> self.run_features_command("upgrade", new_version)
>   File 
> "/home/semaphore/kafka-overla

[jira] [Commented] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-08 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863694#comment-17863694
 ] 

Luke Chen commented on KAFKA-17084:
---

This passed in another 3 separate environments, I think it's good to close it. 
Thanks all!

> Network Degrade Test fails in System Tests
> --
>
> Key: KAFKA-17084
> URL: https://issues.apache.org/jira/browse/KAFKA-17084
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
> Attachments: TEST-kafka.xml
>
>
> Tests for NetworkDegradeTest fail consistently on the 3.8 branch.
>  
> Tests failing are:
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 1000,
>   "task_name": "latency-100-rate-1000"
> }
> {noformat}
>  
> and 
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 0,
>   "task_name": "latency-100"
> }
> {noformat}
>  
> Failure for the first one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
> '10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.34.105', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000-1790
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a237dc10>, '_sftp_client':  0x7f17a2393910>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker21', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
>  line 66, in test_latency
> for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
> zk1.account.hostname):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 680, in next
> return next(self.iter_obj)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 347, in output_generator
> raise RemoteCommandError(self, cmd, exit_status, stderr.read())
> ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
> 'ping -i 1 -c 20 worker21' returned non-zero exit status 1.{noformat}
> And for the second one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker28', 'hostname': 
> '10.140.41.79', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker28', 'ssh_hostname': '10.140.41.79', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.41.79', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0-1791
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a1c7b7c0>, '_sftp_client':  0x7f17a1c7b2b0>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker27', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.

[jira] [Resolved] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-17084.
---
Fix Version/s: 3.8.0
   Resolution: Not A Problem

> Network Degrade Test fails in System Tests
> --
>
> Key: KAFKA-17084
> URL: https://issues.apache.org/jira/browse/KAFKA-17084
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
> Fix For: 3.8.0
>
> Attachments: TEST-kafka.xml
>
>
> Tests for NetworkDegradeTest fail consistently on the 3.8 branch.
>  
> Tests failing are:
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 1000,
>   "task_name": "latency-100-rate-1000"
> }
> {noformat}
>  
> and 
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 0,
>   "task_name": "latency-100"
> }
> {noformat}
>  
> Failure for the first one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
> '10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.34.105', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000-1790
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a237dc10>, '_sftp_client':  0x7f17a2393910>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker21', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
>  line 66, in test_latency
> for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
> zk1.account.hostname):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 680, in next
> return next(self.iter_obj)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 347, in output_generator
> raise RemoteCommandError(self, cmd, exit_status, stderr.read())
> ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
> 'ping -i 1 -c 20 worker21' returned non-zero exit status 1.{noformat}
> And for the second one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker28', 'hostname': 
> '10.140.41.79', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker28', 'ssh_hostname': '10.140.41.79', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.41.79', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0-1791
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a1c7b7c0>, '_sftp_client':  0x7f17a1c7b2b0>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker27', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/r

[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-08 Thread Tom Kalmijn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863690#comment-17863690
 ] 

Tom Kalmijn commented on KAFKA-17093:
-

The thing you mention about apiVersion, is this something I could check on my 
end?

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-08 Thread Tom Kalmijn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863689#comment-17863689
 ] 

Tom Kalmijn commented on KAFKA-17093:
-

kangning, thank you so much for your time and effort! I do apologise if my 
issue makes you double things which are actually ok.

Maybe I am mistaken about the meaning of LSO. I understand "read_committed" 
instructs a consumer to look at committed messages only. If so I find it 
surprising that seekToEnd() takes me to an offset where there is nothing to be 
read? I am after the offset of the last existing message in a partition. If 
seekToEnd does not take me there then I have to resort to back tracking an 
polling to figure out where this message is at?

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-08 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863672#comment-17863672
 ] 

kangning.li commented on KAFKA-17093:
-

Another possibility is {{{}apiVersion{}}}. If ({{{}apiVersion == 0{}}}) 
{{{}{}}}, consumer transaction configuration will be ignored, and it will 
always return {{HW}}

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-07 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863664#comment-17863664
 ] 

kangning.li commented on KAFKA-17093:
-

I have run it on both the trunk branch and the 3.6 branch, and both returned 
LSO, which meets expectations.   

Other?

> KafkaConsumer.seekToEnd should return LSO 
> --
>
> Key: KAFKA-17093
> URL: https://issues.apache.org/jira/browse/KAFKA-17093
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.6.1
> Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
> "kafka-clients" % "3.6.1"
>Reporter: Tom Kalmijn
>Priority: Major
>
>  
> Expected
> When using a transactional producer then the method 
> KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
> "read_committed" should return the LSO. 
> Observed
> The offset returned is always the actual last offset of the partition, which 
> is not the LSO if the latest offsets are occupied by transaction markers.
> Also see this Slack thread:
> https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13499) Avoid restoring outdated records

2024-07-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863639#comment-17863639
 ] 

Matthias J. Sax commented on KAFKA-13499:
-

It's been a while since I filed these tickets... Not even sure if I did look at 
(ie remember KAFKA-7934) when I filing this ticket. – So not sure if they are 
_substantively_ different or are the same.

The main difference addressing your second question is, that stream-stream join 
state stores are not exposed via IQ, and thus we can be more aggressive and 
restore less data compared to windowed and sessions stores for which we need to 
restore a longer history to make the data available for IQ queries.

> Avoid restoring outdated records
> 
>
> Key: KAFKA-13499
> URL: https://issues.apache.org/jira/browse/KAFKA-13499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Danica Fine
>Priority: Major
>
> Kafka Streams has the config `windowstore.changelog.additional.retention.ms` 
> to allow for an increase retention time.
> While an increase retention time can be useful, it can also lead to 
> unnecessary restore cost, especially for stream-stream joins. Assume a 
> stream-stream join with 1h window size and a grace period of 1h. For this 
> case, we only need 2h of data to restore. If we lag, the 
> `windowstore.changelog.additional.retention.ms` helps to prevent the broker 
> from truncating data too early. However, if we don't lag and we need to 
> restore, we restore everything from the changelog.
> Instead of doing a seek-to-beginning, we could use the timestamp index to 
> seek the first offset older than the 2h "window" of data that we need to 
> restore, to avoid unnecessary work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17085) Streams Cooperative Rebalance Upgrade Test fails in System Tests

2024-07-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17085:
---

Assignee: Matthias J. Sax

> Streams Cooperative Rebalance Upgrade Test fails in System Tests
> 
>
> Key: KAFKA-17085
> URL: https://issues.apache.org/jira/browse/KAFKA-17085
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Assignee: Matthias J. Sax
>Priority: Critical
>
> StreamsCooperativeRebalanceUpgradeTest fails on system tests when upgrading 
> from: 2.1.1, 2.2.2 and 2.3.1.
> Tests that fail:
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.1.1"
> }
>  
> {noformat}
> and
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.2.2"
> }
> {noformat}
> and
>  
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.3.1"
> }
> {noformat}
>  
> Failure for 2.1.1 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker28")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_verify(processors,
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 182, in maybe_upgrade_rolling_bounce_and_verify
> stdout_monitor.wait_until(verify_processing_msg,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 736, in wait_until
> return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
> (self.offset + 1, self.log, pattern),
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Never saw 'first_bounce_phase-Processed [0-9]* 
> records so far' message ubuntu@worker28{noformat}
> Failure for 2.2.2 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker5")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_v

[jira] [Resolved] (KAFKA-17081) Tweak GroupCoordinatorConfig: re-introduce local attributes and validation

2024-07-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17081.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Tweak GroupCoordinatorConfig: re-introduce local attributes and validation
> --
>
> Key: KAFKA-17081
> URL: https://issues.apache.org/jira/browse/KAFKA-17081
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.9.0
>
>
> see discussion: 
> https://github.com/apache/kafka/pull/16458#issuecomment-2206822223



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15773) Group protocol configuration should be validated

2024-07-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-15773:
--

Assignee: PoAn Yang

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12899) Support --bootstrap-server in ReplicaVerificationTool

2024-07-07 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee resolved KAFKA-12899.
-
Resolution: Won't Fix

Replaced by KAFKA-17073

> Support --bootstrap-server in ReplicaVerificationTool
> -
>
> Key: KAFKA-12899
> URL: https://issues.apache.org/jira/browse/KAFKA-12899
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: needs-kip
>
> kafka.tools.ReplicaVerificationTool still uses --broker-list, breaking 
> consistency with other (already migrated) tools.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17073) Deprecate ReplicaVerificationTool in 3.9

2024-07-07 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863568#comment-17863568
 ] 

Dongjin Lee commented on KAFKA-17073:
-

KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=311627623

> Deprecate ReplicaVerificationTool in 3.9
> 
>
> Key: KAFKA-17073
> URL: https://issues.apache.org/jira/browse/KAFKA-17073
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Dongjin Lee
>Priority: Minor
>  Labels: need-kip
> Fix For: 3.9.0
>
>
> see discussion 
> https://lists.apache.org/thread/6zz7xwps8lq2lxfo5bhyl4cggh64c5py
> In short, the tool is useless and so it is good time to deprecate it in 3.9. 
> That enables us to remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863559#comment-17863559
 ] 

Chris Egerton edited comment on KAFKA-17044 at 7/7/24 11:31 AM:


[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

 

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.


was (Author: chrisegerton):
[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/run

[jira] [Comment Edited] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863559#comment-17863559
 ] 

Chris Egerton edited comment on KAFKA-17044 at 7/7/24 11:31 AM:


[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.


was (Author: chrisegerton):
[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

 

It's possible for us to interrupt connector threads when they appear blocked, 
but this doesn't guarantee much because ultimately it's up to your connector, 
the libraries it calls, and even your JVM to respond to thread interrupts 
correctly and in several cases this simply doesn't happen.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for co

[jira] [Commented] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-07 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863559#comment-17863559
 ] 

Chris Egerton commented on KAFKA-17044:
---

[~bgoyal] I think you should reconsider the implementation of your connector. 
Instead of blocking in {{{}start{}}}, you can perform retries in a separate 
thread, and whenever a new set of task configurations needs to be generated 
(e.g., when a db connection has finally been established), invoke 
[context.requestTaskReconfiguration|https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/connector/ConnectorContext.html#requestTaskReconfiguration()]
 to signal to the runtime that {{taskConfigs}} should be called again.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes.
> The issue was observed when a connector was configured to retry a db 
> connection for sometime. 
> {*}Current Behaviour{*}: The connector did not shutdown until the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and 
> shutdown as requested by the Delete call.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15773) Group protocol configuration should be validated

2024-07-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863546#comment-17863546
 ] 

PoAn Yang commented on KAFKA-15773:
---

Hi [~pnee], I just created a draft PR 
[https://github.com/apache/kafka/pull/16543]. Feel free to close it, if you're 
working on it. Thank you.

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-17089) Incorrect JWT parsing in OAuthBearerUnsecuredJws

2024-07-07 Thread Jira


[ https://issues.apache.org/jira/browse/KAFKA-17089 ]


黃竣陽 deleted comment on KAFKA-17089:
-

was (Author: JIRAUSER305187):
Im interesting in this issue, Could you assign to me?

> Incorrect JWT parsing in OAuthBearerUnsecuredJws
> 
>
> Key: KAFKA-17089
> URL: https://issues.apache.org/jira/browse/KAFKA-17089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Björn Löfroth
>Priority: Major
>
> The documentation for the `OAuthBearerUnsecuredJws.toMap` function correctly 
> describes that the input is Base64URL, but then goes ahead and does a simple 
> base64 decode.
> [https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java#L295]
>  
> It should probably be 
> ```
> {color:#c678dd}byte{color}{color:#abb2bf}[{color}{color:#abb2bf}]{color} 
> decode {color:#61afef}={color} 
> {color:#d19a66}Base64{color}{color:#abb2bf}.{color}{color:#61afef}getUrlDecoder{color}{color:#abb2bf}({color}{color:#abb2bf}){color}{color:#abb2bf}.{color}{color:#61afef}decode{color}{color:#abb2bf}({color}split{color:#abb2bf}){color}{color:#abb2bf};{color}
> ```
> The error I get when using Confluent Schema Registry clients:
> ```
> org.apache.kafka.common.errors.SerializationException: Error serializing JSON 
> message
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:171)
>     at 
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:95)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:832)
>     at 
> se.ica.icc.schemaregistry.example.confluent.ProducerJsonExample.main(ProducerJsonExample.java:87)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.exceptions.SchemaRegistryOauthTokenRetrieverException:
>  Error while fetching Oauth Token for Schema Registry: OAuth Token for Schema 
> Registry is Invalid
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:74)
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getBearerToken(OauthCredentialProvider.java:53)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.setAuthRequestHeaders(RestService.java:1336)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.buildConnection(RestService.java:361)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:300)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:981)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:972)
>     at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:574)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:571)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554)
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:151)
>     ... 11 more
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.secured.Vali

[jira] [Commented] (KAFKA-15773) Group protocol configuration should be validated

2024-07-07 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863541#comment-17863541
 ] 

PoAn Yang commented on KAFKA-15773:
---

Hi [~pnee], I'm interested in this issue. If you're not working on it, may I 
take it? Thank you.

> Group protocol configuration should be validated
> 
>
> Key: KAFKA-15773
> URL: https://issues.apache.org/jira/browse/KAFKA-15773
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the user specifies using the generic group, or not specifying the 
> group.protocol config at all, we should invalidate all group.remote.assignor
>  
> If group.local.assignor and group.remote.assignor are both configured, we 
> should also invalidate the configuration
>  
> This is an optimization/user experience improvement.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-07-07 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-16355:
-

Assignee: (was: PoAn Yang)

> ConcurrentModificationException in 
> InMemoryTimeOrderedKeyValueBuffer.evictWhile
> ---
>
> Key: KAFKA-16355
> URL: https://issues.apache.org/jira/browse/KAFKA-16355
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Mickael Maison
>Priority: Major
>
> While a Streams application was restoring its state after an outage, it hit 
> the following:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, 
> partition=16, offset=454875695, 
> stacktrace=java.util.ConcurrentModificationException
> at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
> at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
> at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
> at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(Processor

[jira] [Commented] (KAFKA-17089) Incorrect JWT parsing in OAuthBearerUnsecuredJws

2024-07-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863532#comment-17863532
 ] 

黃竣陽 commented on KAFKA-17089:
-

Im interesting in this issue, Could you assign to me?

> Incorrect JWT parsing in OAuthBearerUnsecuredJws
> 
>
> Key: KAFKA-17089
> URL: https://issues.apache.org/jira/browse/KAFKA-17089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Björn Löfroth
>Priority: Major
>
> The documentation for the `OAuthBearerUnsecuredJws.toMap` function correctly 
> describes that the input is Base64URL, but then goes ahead and does a simple 
> base64 decode.
> [https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java#L295]
>  
> It should probably be 
> ```
> {color:#c678dd}byte{color}{color:#abb2bf}[{color}{color:#abb2bf}]{color} 
> decode {color:#61afef}={color} 
> {color:#d19a66}Base64{color}{color:#abb2bf}.{color}{color:#61afef}getUrlDecoder{color}{color:#abb2bf}({color}{color:#abb2bf}){color}{color:#abb2bf}.{color}{color:#61afef}decode{color}{color:#abb2bf}({color}split{color:#abb2bf}){color}{color:#abb2bf};{color}
> ```
> The error I get when using Confluent Schema Registry clients:
> ```
> org.apache.kafka.common.errors.SerializationException: Error serializing JSON 
> message
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:171)
>     at 
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:95)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:832)
>     at 
> se.ica.icc.schemaregistry.example.confluent.ProducerJsonExample.main(ProducerJsonExample.java:87)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.exceptions.SchemaRegistryOauthTokenRetrieverException:
>  Error while fetching Oauth Token for Schema Registry: OAuth Token for Schema 
> Registry is Invalid
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:74)
>     at 
> io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getBearerToken(OauthCredentialProvider.java:53)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.setAuthRequestHeaders(RestService.java:1336)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.buildConnection(RestService.java:361)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:300)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:981)
>     at 
> io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:972)
>     at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:574)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:571)
>     at 
> io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554)
>     at 
> io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:151)
>     ... 11 more
> Caused by: 
> org.apache.kafka.common.security.oauthbearer.internals.secured.Vali

[jira] [Commented] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-06 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863442#comment-17863442
 ] 

Chia-Ping Tsai commented on KAFKA-17084:


It pass on my local


{code:java}
docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/core/network_degrade_test.py "
/usr/local/lib/python3.9/dist-packages/paramiko/transport.py:236: 
CryptographyDeprecationWarning: Blowfish has been deprecated and will be 
removed in a future release
  "class": algorithms.Blowfish,
[INFO:2024-07-05 18:26:22,698]: starting test run with session id 
2024-07-05--001...
[INFO:2024-07-05 18:26:22,698]: running 4 tests...
[INFO:2024-07-05 18:26:22,699]: Triggering test 1 of 4...
[INFO:2024-07-05 18:26:22,705]: RunnerClient: Loading test \{'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
'network_degrade_test.py', 'cls_name': 'NetworkDegradeTest', 'method_name': 
'test_latency', 'injected_args': {'task_name': 'latency-100-rate-1000', 
'device_name': 'eth0', 'latency_ms': 50, 'rate_limit_kbit': 1000}}
[INFO:2024-07-05 18:26:22,712]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 on run 1/1
[INFO:2024-07-05 18:26:22,713]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Setting up...
[INFO:2024-07-05 18:26:33,111]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Running...
[INFO:2024-07-05 18:26:53,298]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Tearing down...
[INFO:2024-07-05 18:27:02,302]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 PASS
[WARNING - 2024-07-05 18:27:02,302 - runner_client - log - lineno:294]: 
RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Test requested 5 nodes, used only 4
[WARNING:2024-07-05 18:27:02,303]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Test requested 5 nodes, used only 4
[INFO:2024-07-05 18:27:02,305]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000:
 Data: None
[INFO:2024-07-05 18:27:02,313]: 
~
[INFO:2024-07-05 18:27:02,313]: Triggering test 2 of 4...
[INFO:2024-07-05 18:27:02,320]: RunnerClient: Loading test \{'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/core', 'file_name': 
'network_degrade_test.py', 'cls_name': 'NetworkDegradeTest', 'method_name': 
'test_latency', 'injected_args': {'task_name': 'latency-100', 'device_name': 
'eth0', 'latency_ms': 50, 'rate_limit_kbit': 0}}
[INFO:2024-07-05 18:27:02,323]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 on run 1/1
[INFO:2024-07-05 18:27:02,324]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Setting up...
[INFO:2024-07-05 18:27:13,280]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Running...
[INFO:2024-07-05 18:27:33,398]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Tearing down...
[INFO:2024-07-05 18:27:42,431]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 PASS
[WARNING - 2024-07-05 18:27:42,432 - runner_client - log - lineno:294]: 
RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0:
 Test requested 5 nodes, used only 4
[WARNING:2024-07-05 18:27:42,433]: RunnerClient: 
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100

[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2024-07-05 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863422#comment-17863422
 ] 

Greg Harris commented on KAFKA-10370:
-

Another option is to try and pull the exception forward, so that tasks that 
call offsets() for tasks that aren't owned receive exceptions at the point that 
the bug first happens. The runtime can then discard offsets that were correctly 
set by the task, but become invalid later because a rebalance took place.

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Priority: Major
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.ja

[jira] [Resolved] (KAFKA-16806) Explicitly declare JUnit dependencies for all test modules

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16806.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Explicitly declare JUnit dependencies for all test modules
> --
>
> Key: KAFKA-16806
> URL: https://issues.apache.org/jira/browse/KAFKA-16806
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Major
> Fix For: 3.9.0
>
>
> The automatic loading of test framework implementation dependencies has been 
> deprecated.    
> This is scheduled to be removed in Gradle 9.0.    
> Declare the desired test framework directly on the test suite or explicitly 
> declare the test framework implementation dependencies on the test's runtime 
> classpath.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_framework_implementation_dependencies]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17092:
---
Issue Type: Test  (was: Bug)

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer
> -
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17092:
---
Summary: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for 
AsyncConsumer  (was: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`)

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout` for AsyncConsumer
> -
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`

2024-07-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863417#comment-17863417
 ] 

Chia-Ping Tsai commented on KAFKA-17092:


It seems those offset-related timeout tests expect that consumer SHOULD retry 
the LIST_OFFSET request [0]. However, AsyncConsumer retry the LIST_OFFSET 
requests only if the metadata gets updated. Unfortunately, the story has no 
metadata update and hence no retry can happen.

one more thing: it can pass sometimes!!! The root cause is `AsyncConsumer` will 
send FIND_COORDINATOR request first. If we are lucky today and the prepared 
responses are ready before handling the FIND_COORDINATOR request, the mock time 
can get advanced to trigger the timeout exception.

In short, we should adjust the `consumerForCheckingTimeoutException` to have 
different prepared responses for different protocl.

1. keep 10 prepared responses for classic consumer
2. new consumer -> one prepared response is good enough :)

[0] 
[https://github.com/apache/kafka/blob/1bec3811adcfeca8973f89dc4ff23d3a35d08d74/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3323]

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`
> ---
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-05 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863411#comment-17863411
 ] 

Igor Soarez commented on KAFKA-17084:
-

It's passing for me locally too.
{code:java}
$ 
TC_PATHS="tests/kafkatest/tests/core/network_degrade_test.py::NetworkDegradeTest.test_latency"
 bash tests/docker/run_tests.sh     
(...)

SESSION REPORT (ALL TESTS)
ducktape version: 0.11.4
session_id:       2024-07-05--007
run time:         1 minute 12.856 seconds
tests run:        2
passed:           2
flaky:            0
failed:           0
ignored:          0

test_id:    
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000
status:     PASS
run time:   36.671 seconds

test_id:    
kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0
status:     PASS
run time:   36.136 seconds

{code}
 

> Network Degrade Test fails in System Tests
> --
>
> Key: KAFKA-17084
> URL: https://issues.apache.org/jira/browse/KAFKA-17084
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
> Attachments: TEST-kafka.xml
>
>
> Tests for NetworkDegradeTest fail consistently on the 3.8 branch.
>  
> Tests failing are:
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 1000,
>   "task_name": "latency-100-rate-1000"
> }
> {noformat}
>  
> and 
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 0,
>   "task_name": "latency-100"
> }
> {noformat}
>  
> Failure for the first one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
> '10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.34.105', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000-1790
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a237dc10>, '_sftp_client':  0x7f17a2393910>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker21', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
>  line 66, in test_latency
> for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
> zk1.account.hostname):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 680, in next
> return next(self.iter_obj)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 347, in output_generator
> raise RemoteCommandError(self, cmd, exit_status, stderr.read())
> ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
&

[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Swathi Mocharla (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863401#comment-17863401
 ] 

Swathi Mocharla commented on KAFKA-17086:
-

yes please, i'll raise a PR ASAP

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17094) Make it possible to list registered KRaft nodes in order to know which nodes should be unregistered

2024-07-05 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-17094:


 Summary: Make it possible to list registered KRaft nodes in order 
to know which nodes should be unregistered
 Key: KAFKA-17094
 URL: https://issues.apache.org/jira/browse/KAFKA-17094
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.1
Reporter: Jakub Scholz


Kafka seems to require nodes that are removed from the cluster to be 
unregistered using the Kafka Admin API. If they are unregistred, that you might 
run into problems later. For example, after upgrade when you try to bump the 
KRaft metadata version, you might get an error like this:

 
{code:java}
g.apache.kafka.common.errors.InvalidUpdateVersionException: Invalid update 
version 19 for feature metadata.version. Broker 3002 only supports versions 
1-14 {code}
In this case, 3002 is an old node that was removed before the upgrade and 
doesn't support the KRaft metadata version 19 and blocks the metadata update.

 

However, it seems to be impossible to list the registered nodes in order to 
unregister them:
 * The describe cluster metadata request in the Admin API seems to return only 
the IDs of running brokers
 * The describe metadata quorum command seems to list the removed nodes in the 
list of observers. But it does so only until the controller nodes are restarted.

If Kafka expects the inactive nodes to be registered, it should provide a list 
of the registered nodes so that it can be checked what nodes to unregister.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-07-05 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-16801:
---

Assignee: Evgeny Ushakov

> Streams upgrade :test target doesn't find any junit tests
> -
>
> Key: KAFKA-16801
> URL: https://issues.apache.org/jira/browse/KAFKA-16801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Greg Harris
>Assignee: Evgeny Ushakov
>Priority: Major
>  Labels: newbie
>
> No test executed. This behavior has been deprecated.    
> This will fail with an error in Gradle 9.0.    
> There are test sources present but no test was executed. Please check your 
> test configuration.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
>     
> 23 usages
>  
> Task::streams:upgrade-system-tests-0100:test    
> Task::streams:upgrade-system-tests-0101:test    
> Task::streams:upgrade-system-tests-0102:test    
> Task::streams:upgrade-system-tests-0110:test    
> Task::streams:upgrade-system-tests-10:test    
> Task::streams:upgrade-system-tests-11:test    
> Task::streams:upgrade-system-tests-20:test    
> Task::streams:upgrade-system-tests-21:test    
> Task::streams:upgrade-system-tests-22:test    
> Task::streams:upgrade-system-tests-23:test    
> Task::streams:upgrade-system-tests-24:test    
> Task::streams:upgrade-system-tests-25:test    
> Task::streams:upgrade-system-tests-26:test    
> Task::streams:upgrade-system-tests-27:test    
> Task::streams:upgrade-system-tests-28:test    
> Task::streams:upgrade-system-tests-30:test    
> Task::streams:upgrade-system-tests-31:test    
> Task::streams:upgrade-system-tests-32:test    
> Task::streams:upgrade-system-tests-33:test    
> Task::streams:upgrade-system-tests-34:test    
> Task::streams:upgrade-system-tests-35:test    
> Task::streams:upgrade-system-tests-36:test    
> Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-07-05 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863396#comment-17863396
 ] 

Greg Harris commented on KAFKA-16801:
-

Hi [~eugene.u] Thanks for volunteering.

I've given you permissions to assign tickets, and i'll assign this one to you 
so you can start working. Ping me when you have a PR that's ready for review.

Thanks!

> Streams upgrade :test target doesn't find any junit tests
> -
>
> Key: KAFKA-16801
> URL: https://issues.apache.org/jira/browse/KAFKA-16801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> No test executed. This behavior has been deprecated.    
> This will fail with an error in Gradle 9.0.    
> There are test sources present but no test was executed. Please check your 
> test configuration.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
>     
> 23 usages
>  
> Task::streams:upgrade-system-tests-0100:test    
> Task::streams:upgrade-system-tests-0101:test    
> Task::streams:upgrade-system-tests-0102:test    
> Task::streams:upgrade-system-tests-0110:test    
> Task::streams:upgrade-system-tests-10:test    
> Task::streams:upgrade-system-tests-11:test    
> Task::streams:upgrade-system-tests-20:test    
> Task::streams:upgrade-system-tests-21:test    
> Task::streams:upgrade-system-tests-22:test    
> Task::streams:upgrade-system-tests-23:test    
> Task::streams:upgrade-system-tests-24:test    
> Task::streams:upgrade-system-tests-25:test    
> Task::streams:upgrade-system-tests-26:test    
> Task::streams:upgrade-system-tests-27:test    
> Task::streams:upgrade-system-tests-28:test    
> Task::streams:upgrade-system-tests-30:test    
> Task::streams:upgrade-system-tests-31:test    
> Task::streams:upgrade-system-tests-32:test    
> Task::streams:upgrade-system-tests-33:test    
> Task::streams:upgrade-system-tests-34:test    
> Task::streams:upgrade-system-tests-35:test    
> Task::streams:upgrade-system-tests-36:test    
> Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863391#comment-17863391
 ] 

Chia-Ping Tsai commented on KAFKA-17086:


[~yangpoan] thanks for the help! [~mswathi] WDYT? Do you plan to file PR for it?

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863390#comment-17863390
 ] 

PoAn Yang commented on KAFKA-17086:
---

Hi [~chia7712] / [~mswathi], I can help to update document if you're not 
working on it. Thank you.

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863387#comment-17863387
 ] 

Chia-Ping Tsai commented on KAFKA-17086:


{quote}
Apache Kafka documentation only mentions Java 8,11 and 17. 
https://kafka.apache.org/documentation.html#java 

{quote}

nice finding. we should fix the docs ipso facto.

{quote}
Where can I find more documentation regarding "Kafka CI and release run with 
JDK 8, 11, 17, and 21."
{quote}

please check the README in github (https://github.com/apache/kafka)

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss

2024-07-05 Thread Guillaume Mallet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863384#comment-17863384
 ] 

Guillaume Mallet commented on KAFKA-17062:
--

[~satish.duggana] Thanks for the context.

 

> Any retried operation for copying a segment will have a unique id and it will 
>create a new file/object in the remote store. This will make sure we will not 
>reference partially copied data in later reads and these semantics may vary 
>across different remote storages like object stores. The safe way is to 
>generate a new uuid and try copying the failed segment again.

If my understanding is correct, we could drop the expected idempotent of the 
[copy 
operation|https://github.com/apache/kafka/blob/trunk/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L82-L86]
 as we're never using it (the {{RemoteLogSegmentMetadata}} will always have a 
different ID.

>  One possible tradeoff that can be explored is do not delete the data sooner 
> but we may end up occupying more than the targeted storage in a few scenarios 
> which will eventually be cleaned up.

You said above that "Any retried operation for copying a segment will have a 
unique id and it will create a new file/object in the remote store.", I fail to 
see the drawbacks of attempting to delete those segments earlier if they are 
expected to be independent from the ones for which the copy succeeded. Could 
you help me understand why we wouldn't want to do that ? 

It would make the tradeoff easier to accept as the few scenarios where we would 
end up using more than targeted would also get resolved faster.

> RemoteLogManager - RemoteStorageException causes data loss
> --
>
> Key: KAFKA-17062
>     URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.8.0, 3.7.1, 3.9.0
>Reporter: Guillaume Mallet
>Assignee: Guillaume Mallet
>Priority: Major
>  Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of a

[jira] [Comment Edited] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Swathi Mocharla (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863379#comment-17863379
 ] 

Swathi Mocharla edited comment on KAFKA-17086 at 7/5/24 4:23 PM:
-

hello [~chia7712] , thank you for your response.

I am aware that the Release notes of 3.7 does mention the support for "Support 
building with Java 21 (LTS release)" as part of KAFKA-15485

Apache Kafka documentation only mentions Java 8,11 and 17. 
[https://kafka.apache.org/documentation.html#java] 

Where can I find more documentation regarding "Kafka CI and release run with 
JDK 8, 11, 17, and 21."


was (Author: mswathi):
hello [~chia7712] , thank you for your response.

Release notes of 3.7 does mention the support for Support building with Java 21 
(LTS release)

Apache Kafka documentation only mentions Java 8,11 and 17. 
https://kafka.apache.org/documentation.html#java 

Where can I find more documentation regarding "Kafka CI and release run with 
JDK 8, 11, 17, and 21."

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
>     URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Swathi Mocharla (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863379#comment-17863379
 ] 

Swathi Mocharla commented on KAFKA-17086:
-

hello [~chia7712] , thank you for your response.

Release notes of 3.7 does mention the support for Support building with Java 21 
(LTS release)

Apache Kafka documentation only mentions Java 8,11 and 17. 
https://kafka.apache.org/documentation.html#java 

Where can I find more documentation regarding "Kafka CI and release run with 
JDK 8, 11, 17, and 21."

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17093) KafkaConsumer.seekToEnd should return LSO

2024-07-05 Thread Tom Kalmijn (Jira)
Tom Kalmijn created KAFKA-17093:
---

 Summary: KafkaConsumer.seekToEnd should return LSO 
 Key: KAFKA-17093
 URL: https://issues.apache.org/jira/browse/KAFKA-17093
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.6.1
 Environment: Ubuntu,  IntelliJ, Scala   "org.apache.kafka" % 
"kafka-clients" % "3.6.1"

Reporter: Tom Kalmijn


 

Expected

When using a transactional producer then the method 
KafkaConsumer.seekToEnd(...) of a consumer configured with isolation level 
"read_committed" should return the LSO. 

Observed

The offset returned is always the actual last offset of the partition, which is 
not the LSO if the latest offsets are occupied by transaction markers.

Also see this Slack thread:

https://confluentcommunity.slack.com/archives/C499EFQS0/p1720088282557559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17083) KRaft Upgrade Failures in SystemTests

2024-07-05 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-17083:
---

Assignee: Igor Soarez

> KRaft Upgrade Failures in SystemTests
> -
>
> Key: KAFKA-17083
> URL: https://issues.apache.org/jira/browse/KAFKA-17083
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Assignee: Igor Soarez
>Priority: Critical
>
> 2 System tests for "TestKRaftUpgrade are consistently failing on 3.8 in the 
> system tests.
> {noformat}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_isolated_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "ISOLATED_KRAFT"
> }
> {noformat}
>  
> and 
>  
> {code:java}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_combined_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "COMBINED_KRAFT"
> }
> {code}
>  
> Failure for Isolated is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker15', 'hostname': 
> '10.140.39.207', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker15', 'ssh_hostname': '10.140.39.207', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.39.207', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=dev.metadata_quorum=ISOLATED_KRAFT-674
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f07e2e800a0>, '_sftp_client':  0x7f07e2f3d070>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
> worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7', 1, 
> b'SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding 
> in 
> [jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  Found binding in 
> [jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.\nSLF4J: Actual binding is of type 
> [org.slf4j.impl.Reload4jLoggerFactory]\n1 out of 1 operation(s) failed.\n')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 121, in test_isolated_mode_upgrade
> self.run_upgrade(from_kafka_version)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in run_upgrade
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 105, in run_produce_consume_validate
> core_test_action(*args)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in 
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 75, in perform_version_change
> self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/services/kafka/kafka.py",
>  line 920, in upgrade_metadata_version
> self.run_features_command("upgrade", new_version)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafka

[jira] [Commented] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863361#comment-17863361
 ] 

Mickael Maison commented on KAFKA-17084:


I can get both of these tests to pass in our CI
[^TEST-kafka.xml]

> Network Degrade Test fails in System Tests
> --
>
> Key: KAFKA-17084
> URL: https://issues.apache.org/jira/browse/KAFKA-17084
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
> Attachments: TEST-kafka.xml
>
>
> Tests for NetworkDegradeTest fail consistently on the 3.8 branch.
>  
> Tests failing are:
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 1000,
>   "task_name": "latency-100-rate-1000"
> }
> {noformat}
>  
> and 
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 0,
>   "task_name": "latency-100"
> }
> {noformat}
>  
> Failure for the first one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
> '10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.34.105', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000-1790
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a237dc10>, '_sftp_client':  0x7f17a2393910>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker21', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
>  line 66, in test_latency
> for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
> zk1.account.hostname):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 680, in next
> return next(self.iter_obj)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 347, in output_generator
> raise RemoteCommandError(self, cmd, exit_status, stderr.read())
> ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
> 'ping -i 1 -c 20 worker21' returned non-zero exit status 1.{noformat}
> And for the second one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker28', 'hostname': 
> '10.140.41.79', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker28', 'ssh_hostname': '10.140.41.79', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.41.79', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0-1791
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a1c7b7c0>, '_sftp_client':  0x7f17a1c7b2b0>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker27', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-pac

[jira] [Updated] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-05 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-17084:
---
Attachment: TEST-kafka.xml

> Network Degrade Test fails in System Tests
> --
>
> Key: KAFKA-17084
> URL: https://issues.apache.org/jira/browse/KAFKA-17084
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
> Attachments: TEST-kafka.xml
>
>
> Tests for NetworkDegradeTest fail consistently on the 3.8 branch.
>  
> Tests failing are:
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 1000,
>   "task_name": "latency-100-rate-1000"
> }
> {noformat}
>  
> and 
>  
> {noformat}
> Module: kafkatest.tests.core.network_degrade_test
> Class:  NetworkDegradeTest
> Method: test_latency
> Arguments:
> {
>   "device_name": "eth0",
>   "latency_ms": 50,
>   "rate_limit_kbit": 0,
>   "task_name": "latency-100"
> }
> {noformat}
>  
> Failure for the first one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
> '10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.34.105', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100-rate-1000.device_name=eth0.latency_ms=50.rate_limit_kbit=1000-1790
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a237dc10>, '_sftp_client':  0x7f17a2393910>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker21', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
>  line 66, in test_latency
> for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
> zk1.account.hostname):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 680, in next
> return next(self.iter_obj)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 347, in output_generator
> raise RemoteCommandError(self, cmd, exit_status, stderr.read())
> ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
> 'ping -i 1 -c 20 worker21' returned non-zero exit status 1.{noformat}
> And for the second one is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker28', 'hostname': 
> '10.140.41.79', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker28', 'ssh_hostname': '10.140.41.79', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.41.79', '_logger':  kafkatest.tests.core.network_degrade_test.NetworkDegradeTest.test_latency.task_name=latency-100.device_name=eth0.latency_ms=50.rate_limit_kbit=0-1791
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f17a1c7b7c0>, '_sftp_client':  0x7f17a1c7b2b0>, '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
> worker27', 1, b'')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
>

[jira] [Commented] (KAFKA-17083) KRaft Upgrade Failures in SystemTests

2024-07-05 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863359#comment-17863359
 ] 

Igor Soarez commented on KAFKA-17083:
-

Found the exception:

 
{code:java}
org.apache.kafka.common.errors.InvalidUpdateVersionException: Invalid update 
version 19 for feature metadata.version. Can't downgrade the version of this 
feature without setting the upgrade type to either safe or unsafe downgrade. 
{code}
[Thrown in 
FeatureControlManager.updateFeature|https://github.com/apache/kafka/blob/3.8/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java#L228-L232].

 

 

> KRaft Upgrade Failures in SystemTests
> -
>
> Key: KAFKA-17083
> URL: https://issues.apache.org/jira/browse/KAFKA-17083
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
>
> 2 System tests for "TestKRaftUpgrade are consistently failing on 3.8 in the 
> system tests.
> {noformat}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_isolated_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "ISOLATED_KRAFT"
> }
> {noformat}
>  
> and 
>  
> {code:java}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_combined_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "COMBINED_KRAFT"
> }
> {code}
>  
> Failure for Isolated is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker15', 'hostname': 
> '10.140.39.207', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker15', 'ssh_hostname': '10.140.39.207', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.39.207', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=dev.metadata_quorum=ISOLATED_KRAFT-674
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f07e2e800a0>, '_sftp_client':  0x7f07e2f3d070>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
> worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7', 1, 
> b'SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding 
> in 
> [jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  Found binding in 
> [jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.\nSLF4J: Actual binding is of type 
> [org.slf4j.impl.Reload4jLoggerFactory]\n1 out of 1 operation(s) failed.\n')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 121, in test_isolated_mode_upgrade
> self.run_upgrade(from_kafka_version)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in run_upgrade
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 105, in run_produce_consume_validate
> core_test_action(*args)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 105, in 
> self.run_produce_consume_validate(core_test_action=lambda: 
> self.perform_version_change(from_kafka_version))
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_t

[jira] [Commented] (KAFKA-17083) KRaft Upgrade Failures in SystemTests

2024-07-05 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863353#comment-17863353
 ] 

Igor Soarez commented on KAFKA-17083:
-

In both cases, this is the command that fails:

 
{code:java}
/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7 {code}
ducktape shows this as the output from the command:

 

 
{code:java}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
1 out of 1 operation(s) failed. {code}
Ignoring the SLF4J class conflict, there's just one line, and it seems to come 
from 
[here|https://github.com/apache/kafka/blob/3.8/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java#L318]
 – the `update` method in FeatureCommand. It is odd that we don't [see the 
specific 
error|https://github.com/apache/kafka/blob/3.8/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java#L308]
 in the output.

 

> KRaft Upgrade Failures in SystemTests
> -
>
> Key: KAFKA-17083
> URL: https://issues.apache.org/jira/browse/KAFKA-17083
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Priority: Critical
>
> 2 System tests for "TestKRaftUpgrade are consistently failing on 3.8 in the 
> system tests.
> {noformat}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_isolated_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "ISOLATED_KRAFT"
> }
> {noformat}
>  
> and 
>  
> {code:java}
> Module: kafkatest.tests.core.kraft_upgrade_test
> Class:  TestKRaftUpgrade
> Method: test_combined_mode_upgrade
> Arguments:
> {
>   "from_kafka_version": "dev",
>   "metadata_quorum": "COMBINED_KRAFT"
> }
> {code}
>  
> Failure for Isolated is:
> {noformat}
> RemoteCommandError({'ssh_config': {'host': 'worker15', 'hostname': 
> '10.140.39.207', 'user': 'ubuntu', 'port': 22, 'password': None, 
> 'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
> 'hostname': 'worker15', 'ssh_hostname': '10.140.39.207', 'user': 'ubuntu', 
> 'externally_routable_ip': '10.140.39.207', '_logger':  kafkatest.tests.core.kraft_upgrade_test.TestKRaftUpgrade.test_isolated_mode_upgrade.from_kafka_version=dev.metadata_quorum=ISOLATED_KRAFT-674
>  (DEBUG)>, 'os': 'linux', '_ssh_client':  0x7f07e2e800a0>, '_sftp_client':  0x7f07e2f3d070>, '_custom_ssh_exception_checks': None}, 
> '/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
> worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7', 1, 
> b'SLF4J: Class path contains multiple SLF4J bindings.\nSLF4J: Found binding 
> in 
> [jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  Found binding in 
> [jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
>  See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.\nSLF4J: Actual binding is of type 
> [org.slf4j.impl.Reload4jLoggerFactory]\n1 out of 1 operation(s) failed.\n')
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
>  line 121, in test_isolated_mode_upgrade
> self.run_upgrade(from_kafka_version)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/

[jira] [Assigned] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17092:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`
> ---
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`

2024-07-05 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863348#comment-17863348
 ] 

黃竣陽 commented on KAFKA-17092:
-

Im interesting in this issue, Please assign to me.

> Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`
> ---
>
> Key: KAFKA-17092
> URL: https://issues.apache.org/jira/browse/KAFKA-17092
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins 
> encounters same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17092) Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`

2024-07-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17092:
--

 Summary: Revisit `KafkaConsumerTest#testBeginningOffsetsTimeout`
 Key: KAFKA-17092
 URL: https://issues.apache.org/jira/browse/KAFKA-17092
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Sometimes it hangs in my jenkins ... not sure whether Kafka jenkins encounters 
same issue or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17090) Add documentation to CreateTopicsResult#config to remind users that both "type" and "document" are null

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17090:
--

Assignee: Ming-Yen Chung  (was: Chia-Ping Tsai)

> Add documentation to CreateTopicsResult#config to remind users that both 
> "type" and "document" are null 
> 
>
> Key: KAFKA-17090
>     URL: https://issues.apache.org/jira/browse/KAFKA-17090
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Ming-Yen Chung
>Priority: Minor
>
> CreateTopicsResult#config [0] always has null type and null document, since 
> kafka protocol does not declare those fields[1]. However, 
> CreateTopicsResult#config reuse the class `ConfigEntry`, and so users may 
> expect those fields are defined too.
> [0] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java#L68
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsResponse.json#L55



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17090) Add documentation to CreateTopicsResult#config to remind users that both "type" and "document" are null

2024-07-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863251#comment-17863251
 ] 

Chia-Ping Tsai commented on KAFKA-17090:


[~yangpoan] this issue is inspired by [~mingyen066], so we can be the reviewers 
together :)

> Add documentation to CreateTopicsResult#config to remind users that both 
> "type" and "document" are null 
> 
>
> Key: KAFKA-17090
>     URL: https://issues.apache.org/jira/browse/KAFKA-17090
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> CreateTopicsResult#config [0] always has null type and null document, since 
> kafka protocol does not declare those fields[1]. However, 
> CreateTopicsResult#config reuse the class `ConfigEntry`, and so users may 
> expect those fields are defined too.
> [0] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java#L68
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsResponse.json#L55



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17090) Add documentation to CreateTopicsResult#config to remind users that both "type" and "document" are null

2024-07-05 Thread Ming-Yen Chung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863249#comment-17863249
 ] 

Ming-Yen Chung commented on KAFKA-17090:


I am also interested in this issue. Could [~chia7712]  assign this ticket to 
me? Thanks.

> Add documentation to CreateTopicsResult#config to remind users that both 
> "type" and "document" are null 
> 
>
> Key: KAFKA-17090
>     URL: https://issues.apache.org/jira/browse/KAFKA-17090
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> CreateTopicsResult#config [0] always has null type and null document, since 
> kafka protocol does not declare those fields[1]. However, 
> CreateTopicsResult#config reuse the class `ConfigEntry`, and so users may 
> expect those fields are defined too.
> [0] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java#L68
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsResponse.json#L55



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17091) Add @FunctionalInterface to Streams interfaces

2024-07-05 Thread Ray McDermott (Jira)
Ray McDermott created KAFKA-17091:
-

 Summary: Add @FunctionalInterface to Streams interfaces
 Key: KAFKA-17091
 URL: https://issues.apache.org/jira/browse/KAFKA-17091
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Ray McDermott
Assignee: Ray McDermott


Clojure version 1.12 (currently in beta) has many updates to Java interop.

Unfortunately, it does not quite deliver what we need with respect to thinning 
down Kafka Streams interop.

We were specifically hoping that passing {{(fn [] ...)}} to SAM interfaces 
would just work and we would not need to {{reify}} the interface.

Sadly it only works for interfaces that have been explicitly annotated with 
{{@FunctionalInterface}}  - and the Kafka Streams DSL does not have those 
annotations.

Details here

https://ask.clojure.org/index.php/13908/expand-fi-adapting-to-sam-types-not-marked-as-fi



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17090) Add documentation to CreateTopicsResult#config to remind users that both "type" and "document" are null

2024-07-05 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863240#comment-17863240
 ] 

PoAn Yang commented on KAFKA-17090:
---

Hi [~chia7712], I'm interested in this. May I take it? Thank you.

> Add documentation to CreateTopicsResult#config to remind users that both 
> "type" and "document" are null 
> 
>
> Key: KAFKA-17090
>     URL: https://issues.apache.org/jira/browse/KAFKA-17090
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> CreateTopicsResult#config [0] always has null type and null document, since 
> kafka protocol does not declare those fields[1]. However, 
> CreateTopicsResult#config reuse the class `ConfigEntry`, and so users may 
> expect those fields are defined too.
> [0] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java#L68
> [1] 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsResponse.json#L55



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17090) Add documentation to CreateTopicsResult#config to remind users that both "type" and "document" are null

2024-07-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17090:
--

 Summary: Add documentation to CreateTopicsResult#config to remind 
users that both "type" and "document" are null 
 Key: KAFKA-17090
 URL: https://issues.apache.org/jira/browse/KAFKA-17090
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


CreateTopicsResult#config [0] always has null type and null document, since 
kafka protocol does not declare those fields[1]. However, 
CreateTopicsResult#config reuse the class `ConfigEntry`, and so users may 
expect those fields are defined too.



[0] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java#L68
[1] 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/CreateTopicsResponse.json#L55



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17089) Incorrect JWT parsing in OAuthBearerUnsecuredJws

2024-07-05 Thread Jira
Björn Löfroth created KAFKA-17089:
-

 Summary: Incorrect JWT parsing in OAuthBearerUnsecuredJws
 Key: KAFKA-17089
 URL: https://issues.apache.org/jira/browse/KAFKA-17089
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.6.2
Reporter: Björn Löfroth


The documentation for the `OAuthBearerUnsecuredJws.toMap` function correctly 
describes that the input is Base64URL, but then goes ahead and does a simple 
base64 decode.


[https://github.com/apache/kafka/blob/9a7eee60727dc73f09075e971ea35909d2245f19/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java#L295]

 

It should probably be 
```

{color:#c678dd}byte{color}{color:#abb2bf}[{color}{color:#abb2bf}]{color} decode 
{color:#61afef}={color} 
{color:#d19a66}Base64{color}{color:#abb2bf}.{color}{color:#61afef}getUrlDecoder{color}{color:#abb2bf}({color}{color:#abb2bf}){color}{color:#abb2bf}.{color}{color:#61afef}decode{color}{color:#abb2bf}({color}split{color:#abb2bf}){color}{color:#abb2bf};{color}
```

The error I get when using Confluent Schema Registry clients:
```

org.apache.kafka.common.errors.SerializationException: Error serializing JSON 
message

    at 
io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:171)

    at 
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer.serialize(KafkaJsonSchemaSerializer.java:95)

    at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)

    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)

    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:832)

    at 
se.ica.icc.schemaregistry.example.confluent.ProducerJsonExample.main(ProducerJsonExample.java:87)

    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)

    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.base/java.lang.reflect.Method.invoke(Method.java:568)

    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)

    at java.base/java.lang.Thread.run(Thread.java:833)

Caused by: 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.exceptions.SchemaRegistryOauthTokenRetrieverException:
 Error while fetching Oauth Token for Schema Registry: OAuth Token for Schema 
Registry is Invalid

    at 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:74)

    at 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider.getBearerToken(OauthCredentialProvider.java:53)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.setAuthRequestHeaders(RestService.java:1336)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.buildConnection(RestService.java:361)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:300)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:981)

    at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:972)

    at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:574)

    at 
io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:571)

    at 
io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:554)

    at 
io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaSerializer.serializeImpl(AbstractKafkaJsonSchemaSerializer.java:151)

    ... 11 more

Caused by: 
org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException:
 Could not validate the access token: malformed Base64 URL encoded value

    at 
org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator.validate(LoginAccessTokenValidator.java:93)

    at 
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.CachedOauthTokenRetriever.getToken(CachedOauthTokenRetriever.java:72)

    ... 22 more

Caused by: 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException:
 malformed Base64 URL encoded value

    at 
org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.toMap

[jira] [Commented] (KAFKA-17080) bump metadata version for topic Record

2024-07-05 Thread Muralidhar Basani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863232#comment-17863232
 ] 

Muralidhar Basani commented on KAFKA-17080:
---

Thanks, will look at it.

> bump metadata version for topic Record
> --
>
> Key: KAFKA-17080
> URL: https://issues.apache.org/jira/browse/KAFKA-17080
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Muralidhar Basani
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17088) REQUEST_TIMED_OUT occurs intermittently in the kafka Producer client

2024-07-05 Thread Janardhana Gopalachar (Jira)
Janardhana Gopalachar created KAFKA-17088:
-

 Summary: REQUEST_TIMED_OUT occurs intermittently in the kafka 
Producer client 
 Key: KAFKA-17088
 URL: https://issues.apache.org/jira/browse/KAFKA-17088
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.5.1
Reporter: Janardhana Gopalachar


Hi 

We observe that producer receives a request timeout ( intermittently) when 
trying to send message to kafka broker. Below is the properties set for kafka 
producer. 

producerProps.put("acks", "all");

producerProps.put("linger.ms", 0);

producerProps.put("max.block.ms", 5000);

producerProps.put("metadata.max.idle.ms", 5000); 
producerProps.put("delivery.timeout.ms", 1); 
producerProps.put("request.timeout.ms", 1000);

producerProps.put("key.serializer", BYTE_SERIALIZER);

producerProps.put("value.serializer", BYTE_SERIALIZER);

 

 

we receive below message intermittently. We need to know the reaon for this 
timeout.

_[kafka-producer-network-thread | producer-1] 
o.a.k.c.u.LogContext$LocationAwareKafkaLogger:434 writeLog [Producer 
clientId=producer-1] Got error produce response with correlation id 231972 on 
topic-partition {*}health_check_topic_msg2-0{*}, retrying (2147483646 attempts 
left). Error: REQUEST_TIMED_OUT. Error Message: Disconnected from node 1 due to 
timeout_



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17087) Deprecate `delete-config` of TopicCommand

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17087:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> Deprecate `delete-config` of TopicCommand
> -
>
> Key: KAFKA-17087
> URL: https://issues.apache.org/jira/browse/KAFKA-17087
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: need-kip
>
> TopicCommand `delete-config` is an no-op, so we should deprecate it in 3.9 
> and then remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17087) Deprecate `delete-config` of TopicCommand

2024-07-05 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863231#comment-17863231
 ] 

TengYao Chi commented on KAFKA-17087:
-

Hi [~chia7712] 
If you are not start working on this issue, I would like to handle it.

> Deprecate `delete-config` of TopicCommand
> -
>
> Key: KAFKA-17087
> URL: https://issues.apache.org/jira/browse/KAFKA-17087
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: need-kip
>
> TopicCommand `delete-config` is an no-op, so we should deprecate it in 3.9 
> and then remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17087) Deprecate `delete-config` of TopicCommand

2024-07-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17087:
--

 Summary: Deprecate `delete-config` of TopicCommand
 Key: KAFKA-17087
 URL: https://issues.apache.org/jira/browse/KAFKA-17087
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


TopicCommand `delete-config` is an no-op, so we should deprecate it in 3.9 and 
then remove it from 4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17080) bump metadata version for topic Record

2024-07-05 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863227#comment-17863227
 ] 

Luke Chen commented on KAFKA-17080:
---

It's recorded in MetadataVersion.java. [~christo_lolov] has more experience on 
it. You can refer to this PR: [https://github.com/apache/kafka/pull/15673/] for 
more info.

> bump metadata version for topic Record
> --
>
> Key: KAFKA-17080
> URL: https://issues.apache.org/jira/browse/KAFKA-17080
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Muralidhar Basani
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863223#comment-17863223
 ] 

Chia-Ping Tsai commented on KAFKA-17086:


Pardon me, what do you mean "support"?

Currently, both Kafka CI and release run with JDK 8, 11, 17, and 21. Do that 
satisfy you ? :)

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17086) Kafka support for java 21

2024-07-05 Thread Swathi Mocharla (Jira)
Swathi Mocharla created KAFKA-17086:
---

 Summary: Kafka support for java 21
 Key: KAFKA-17086
 URL: https://issues.apache.org/jira/browse/KAFKA-17086
 Project: Kafka
  Issue Type: Wish
  Components: core
Reporter: Swathi Mocharla


When does Apache Kafka plan to support Java 21 from.

Currently there seem to be some known issues that are already fixed in the 
community.

A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17086) Java 21 support in Kafka

2024-07-05 Thread Swathi Mocharla (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Swathi Mocharla updated KAFKA-17086:

Summary: Java 21 support in Kafka  (was: Kafka support for java 21)

> Java 21 support in Kafka
> 
>
> Key: KAFKA-17086
> URL: https://issues.apache.org/jira/browse/KAFKA-17086
> Project: Kafka
>  Issue Type: Wish
>  Components: core
>Reporter: Swathi Mocharla
>Priority: Major
>
> When does Apache Kafka plan to support Java 21 from.
> Currently there seem to be some known issues that are already fixed in the 
> community.
> A timeline would be helpful.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16731) Support for share-group-metrics in the broker

2024-07-05 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-16731:


Assignee: Sushant Mahajan  (was: Andrew Schofield)

> Support for share-group-metrics in the broker
> -
>
> Key: KAFKA-16731
> URL: https://issues.apache.org/jira/browse/KAFKA-16731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Sushant Mahajan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17061) KafkaController takes long time to connect to newly added broker after registration on large cluster

2024-07-05 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863197#comment-17863197
 ] 

Haruki Okada commented on KAFKA-17061:
--

[~showuon] Hi, I submitted a 
[patch|https://github.com/apache/kafka/pull/16529]. Could you take a look?

> KafkaController takes long time to connect to newly added broker after 
> registration on large cluster
> 
>
> Key: KAFKA-17061
> URL: https://issues.apache.org/jira/browse/KAFKA-17061
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
> Attachments: image-2024-07-02-17-22-06-100.png, 
> image-2024-07-02-17-24-11-861.png
>
>
> h2. Environment
>  * Kafka version: 3.3.2
>  * Cluster: 200~ brokers
>  * Total num partitions: 40k
>  * ZK-based cluster
> h2. Phenomenon
> When a broker left the cluster once due to the long STW and came back after a 
> while, the controller took 6 seconds until connecting to the broker after 
> znode registration, it caused significant message delivery delay.
> {code:java}
> [2024-06-22 23:59:38,202] INFO [Controller id=1] Newly added brokers: 2, 
> deleted brokers: , bounced brokers: , all live brokers: 1,... 
> (kafka.controller.KafkaController)
> [2024-06-22 23:59:38,203] DEBUG [Channel manager on controller 1]: Controller 
> 1 trying to connect to broker 2 (kafka.controller.ControllerChannelManager)
> [2024-06-22 23:59:38,205] INFO [RequestSendThread controllerId=1] Starting 
> (kafka.controller.RequestSendThread)
> [2024-06-22 23:59:38,205] INFO [Controller id=1] New broker startup callback 
> for 2 (kafka.controller.KafkaController)
> [2024-06-22 23:59:44,524] INFO [RequestSendThread controllerId=1] Controller 
> 1 connected to broker-2:9092 (id: 2 rack: rack-2) for sending state change 
> requests (kafka.controller.RequestSendThread)
> {code}
> h2. Analysis
> From the flamegraph at that time, we can see that 
> [liveBrokerIds|https://github.com/apache/kafka/blob/3.3.2/core/src/main/scala/kafka/controller/ControllerContext.scala#L217]
>  calculation takes significant time.
> !image-2024-07-02-17-24-11-861.png|width=541,height=303!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17042) the migration docs should remind users to set "broker.id.generation.enable" when adding broker.id

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17042.

Fix Version/s: 3.9.0
   Resolution: Fixed

> the migration docs should remind users to set "broker.id.generation.enable" 
> when adding broker.id
> -
>
> Key: KAFKA-17042
> URL: https://issues.apache.org/jira/browse/KAFKA-17042
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.9.0
>
>
> in the section: Enter Migration Mode on the Brokers
> it requires users to add "broker.id", but it can produces error "broker.id 
> must be greater than or equal to -1 and not greater than 
> reserved.broker.max.id" too. That is caused by the zk broker is using a 
> generated broker id.
> As this phase is temporary, the simple solution is to remind users to add 
> "broker.id.generation.enable=false" if the zk broker is using generated 
> broker id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17085) Streams Cooperative Rebalance Upgrade Test fails in System Tests

2024-07-05 Thread Josep Prat (Jira)
Josep Prat created KAFKA-17085:
--

 Summary: Streams Cooperative Rebalance Upgrade Test fails in 
System Tests
 Key: KAFKA-17085
 URL: https://issues.apache.org/jira/browse/KAFKA-17085
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Affects Versions: 3.8.0
Reporter: Josep Prat


StreamsCooperativeRebalanceUpgradeTest fails on system tests when upgrading 
from: 2.1.1, 2.2.2 and 2.3.1.


Tests that fail:

 
{noformat}
Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
Class:  StreamsCooperativeRebalanceUpgradeTest
Method: test_upgrade_to_cooperative_rebalance
Arguments:
{
  "upgrade_from_version": "2.1.1"
}
 
{noformat}
and

 
{noformat}
Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
Class:  StreamsCooperativeRebalanceUpgradeTest
Method: test_upgrade_to_cooperative_rebalance
Arguments:
{
  "upgrade_from_version": "2.2.2"
}
{noformat}
and

 

 
{noformat}
Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
Class:  StreamsCooperativeRebalanceUpgradeTest
Method: test_upgrade_to_cooperative_rebalance
Arguments:
{
  "upgrade_from_version": "2.3.1"
}
{noformat}
 

Failure for 2.1.1 is:
{noformat}
TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
message ubuntu@worker28")
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
 line 101, in test_upgrade_to_cooperative_rebalance
self.maybe_upgrade_rolling_bounce_and_verify(processors,
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
 line 182, in maybe_upgrade_rolling_bounce_and_verify
stdout_monitor.wait_until(verify_processing_msg,
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 736, in wait_until
return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
(self.offset + 1, self.log, pattern),
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Never saw 'first_bounce_phase-Processed [0-9]* 
records so far' message ubuntu@worker28{noformat}
Failure for 2.2.2 is:
{noformat}
TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
message ubuntu@worker5")
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
 line 101, in test_upgrade_to_cooperative_rebalance
self.maybe_upgrade_rolling_bounce_and_verify(processors,
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
 line 182, in maybe_upgrade_rolling_bounce_and_verify
stdout_monitor.wait_until(verify_processing_msg,
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 736, in wait_until
return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
(self.offset + 1, self.log, pattern),
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: Never saw 'first_boun

[jira] [Created] (KAFKA-17084) Network Degrade Test fails in System Tests

2024-07-05 Thread Josep Prat (Jira)
Josep Prat created KAFKA-17084:
--

 Summary: Network Degrade Test fails in System Tests
 Key: KAFKA-17084
 URL: https://issues.apache.org/jira/browse/KAFKA-17084
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Affects Versions: 3.8.0
Reporter: Josep Prat


Tests for NetworkDegradeTest fail consistently on the 3.8 branch.

 

Tests failing are:

 
{noformat}
Module: kafkatest.tests.core.network_degrade_test
Class:  NetworkDegradeTest
Method: test_latency
Arguments:
{
  "device_name": "eth0",
  "latency_ms": 50,
  "rate_limit_kbit": 1000,
  "task_name": "latency-100-rate-1000"
}
{noformat}
 

and 

 
{noformat}
Module: kafkatest.tests.core.network_degrade_test
Class:  NetworkDegradeTest
Method: test_latency
Arguments:
{
  "device_name": "eth0",
  "latency_ms": 50,
  "rate_limit_kbit": 0,
  "task_name": "latency-100"
}
{noformat}
 

Failure for the first one is:
{noformat}
RemoteCommandError({'ssh_config': {'host': 'worker30', 'hostname': 
'10.140.34.105', 'user': 'ubuntu', 'port': 22, 'password': None, 
'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
'hostname': 'worker30', 'ssh_hostname': '10.140.34.105', 'user': 'ubuntu', 
'externally_routable_ip': '10.140.34.105', '_logger': , 'os': 'linux', '_ssh_client': , '_sftp_client': , '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
worker21', 1, b'')
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
 line 66, in test_latency
for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
zk1.account.hostname):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 680, in next
return next(self.iter_obj)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 347, in output_generator
raise RemoteCommandError(self, cmd, exit_status, stderr.read())
ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker30: Command 
'ping -i 1 -c 20 worker21' returned non-zero exit status 1.{noformat}
And for the second one is:
{noformat}
RemoteCommandError({'ssh_config': {'host': 'worker28', 'hostname': 
'10.140.41.79', 'user': 'ubuntu', 'port': 22, 'password': None, 'identityfile': 
'/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 'hostname': 
'worker28', 'ssh_hostname': '10.140.41.79', 'user': 'ubuntu', 
'externally_routable_ip': '10.140.41.79', '_logger': , 'os': 'linux', '_ssh_client': , '_sftp_client': , '_custom_ssh_exception_checks': None}, 'ping -i 1 -c 20 
worker27', 1, b'')
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/network_degrade_test.py",
 line 66, in test_latency
for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % 
zk1.account.hostname):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 680, in next
return next(self.iter_obj)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 347, in output_generator
    raise RemoteCommandError(self, cmd, exit_status, stderr.read())
ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker28: Command 
'ping -i 1 -c 20 worker27' returned non-zero exit status 1.{noformat}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17083) KRaft Upgrade Failures in SystemTests

2024-07-05 Thread Josep Prat (Jira)
Josep Prat created KAFKA-17083:
--

 Summary: KRaft Upgrade Failures in SystemTests
 Key: KAFKA-17083
 URL: https://issues.apache.org/jira/browse/KAFKA-17083
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Affects Versions: 3.8.0
Reporter: Josep Prat


2 System tests for "TestKRaftUpgrade are consistently failing on 3.8 in the 
system tests.
{noformat}
Module: kafkatest.tests.core.kraft_upgrade_test
Class:  TestKRaftUpgrade
Method: test_isolated_mode_upgrade
Arguments:
{
  "from_kafka_version": "dev",
  "metadata_quorum": "ISOLATED_KRAFT"
}
{noformat}
 

and 

 
{code:java}
Module: kafkatest.tests.core.kraft_upgrade_test
Class:  TestKRaftUpgrade
Method: test_combined_mode_upgrade
Arguments:
{
  "from_kafka_version": "dev",
  "metadata_quorum": "COMBINED_KRAFT"
}
{code}
 

Failure for Isolated is:
{noformat}
RemoteCommandError({'ssh_config': {'host': 'worker15', 'hostname': 
'10.140.39.207', 'user': 'ubuntu', 'port': 22, 'password': None, 
'identityfile': '/home/semaphore/kafka-overlay/semaphore-muckrake.pem'}, 
'hostname': 'worker15', 'ssh_hostname': '10.140.39.207', 'user': 'ubuntu', 
'externally_routable_ip': '10.140.39.207', '_logger': , 'os': 'linux', '_ssh_client': , '_sftp_client': , '_custom_ssh_exception_checks': None}, 
'/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7', 1, b'SLF4J: 
Class path contains multiple SLF4J bindings.\nSLF4J: Found binding in 
[jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
 Found binding in 
[jar:file:/vagrant/trogdor/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\nSLF4J:
 See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.\nSLF4J: Actual binding is of type 
[org.slf4j.impl.Reload4jLoggerFactory]\n1 out of 1 operation(s) failed.\n')
Traceback (most recent call last):
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 184, in _do_run
data = self.run_test()
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
 line 262, in run_test
return self.test_context.function(self.test)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
 line 121, in test_isolated_mode_upgrade
self.run_upgrade(from_kafka_version)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
 line 105, in run_upgrade
self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_version_change(from_kafka_version))
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 105, in run_produce_consume_validate
core_test_action(*args)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
 line 105, in 
self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_version_change(from_kafka_version))
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/core/kraft_upgrade_test.py",
 line 75, in perform_version_change
self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/services/kafka/kafka.py", 
line 920, in upgrade_metadata_version
self.run_features_command("upgrade", new_version)
  File 
"/home/semaphore/kafka-overlay/kafka/tests/kafkatest/services/kafka/kafka.py", 
line 930, in run_features_command
self.nodes[0].account.ssh(cmd)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 35, in wrapper
return method(self, *args, **kwargs)
  File 
"/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
 line 293, in ssh
raise RemoteCommandError(self, cmd, exit_status, stderr.read())
ducktape.cluster.remoteaccount.RemoteCommandError: ubuntu@worker15: Command 
'/opt/kafka-dev/bin/kafka-features.sh --bootstrap-server 
worker15:9092,worker16:9092,worker17:9092 upgrade --metadata 3.7' returned 
non-zero exit status 1. Remote error message: b'SLF4J: Class path contains 
multiple SLF4J bindings.\nSLF4J: Found binding in 
[jar:file:/vagrant/tools/build/dependant-libs-2.13.14/slf4j-reload4j-1.7.36

[jira] [Commented] (KAFKA-16731) Support for share-group-metrics in the broker

2024-07-05 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863158#comment-17863158
 ] 

Andrew Schofield commented on KAFKA-16731:
--

https://github.com/apache/kafka/pull/16488

> Support for share-group-metrics in the broker
> -
>
> Key: KAFKA-16731
> URL: https://issues.apache.org/jira/browse/KAFKA-16731
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4374) Improve Response Errors Logging

2024-07-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-4374.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Improve Response Errors Logging
> ---
>
> Key: KAFKA-4374
> URL: https://issues.apache.org/jira/browse/KAFKA-4374
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Jesse Anderson
>Assignee: Ksolves
>Priority: Minor
> Fix For: 3.9.0
>
>
> When NetworkClient.java gets a response error, it runs:
> {code}
> if (response.errors().size() > 0) {
> log.warn("Error while fetching metadata with correlation id 
> {} : {}", header.correlationId(), response.errors());
> }
> {code}
> Logging that at warn level and saying there is an error, is confusing to new 
> people. They don't see it was a warn and not error level. They just see that 
> it says "Error while...".
> Maybe it should be something like "The metadata response from the cluster 
> reported a recoverable issue..."



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17062) RemoteLogManager - RemoteStorageException causes data loss

2024-07-04 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863117#comment-17863117
 ] 

Satish Duggana commented on KAFKA-17062:


[~guillaumemallet] Thanks for filing the issue.

>I was thinking of how to fix this and thought of not accounting for segments 
>in a different state than copy_finished and/or delete_started as you said but 
>this means we could potentially breach the retention.bytes we initially 
>planned for. 

Right, that is why this approach was not considered earlier.


>Good quesiton that should we create new  RemoteLogSegmentId when failed 
>segment retried?

Any retried operation for copying a segment will have a unique id and it will 
create a new file/object in the remote store. This will make sure we will not 
reference partially copied data in later reads and these semantics may vary 
across different remote storages like object stores. The safe way is to 
generate a new uuid and try copying the failed segment again.

One possible tradeoff that can be explored is do not delete the data sooner but 
we may end up occupying more than the targeted storage in a few scenarios which 
will eventually be cleaned up. 

> RemoteLogManager - RemoteStorageException causes data loss
> --
>
> Key: KAFKA-17062
>     URL: https://issues.apache.org/jira/browse/KAFKA-17062
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.8.0, 3.7.1, 3.9.0
>Reporter: Guillaume Mallet
>Assignee: Guillaume Mallet
>Priority: Major
>  Labels: tiered-storage
>
> When Tiered Storage is configured, retention.bytes defines the limit for the 
> amount of data stored in the filesystem and in remote storage. However a 
> failure while offloading to remote storage can cause segments to be dropped 
> before the retention limit is met.
> What happens
> Assuming a topic configured with {{retention.bytes=4294967296}} (4GB) and a 
> {{local.retention.bytes=1073741824}} (1GB, equal to segment.bytes) we would 
> expect Kafka to keep up to 3 segments (3GB) in the remote store and 1 segment 
> locally (the local segment) and possibly more if the remote storage is 
> offline. i.e. segments in the following RemoteLogSegmentStates in the 
> RemoteLogMetadataManager (RLMM) :
>  * Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}})
>  * Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
> Let's assume the RLMM starts failing when segment 4 rolls. At the first 
> iteration of an RLMTask we will have -
>  * 
> [{{copyLogSegmentsToRemote}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L773]
>  : is called first
>  ** RLMM becomes aware of Segment 4 and adds it to the metadata:
>  *** Segment 4 ({{{}COPY_SEGMENT_STARTED{}}}),
>  *** Segment 3 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 2 ({{{}COPY_SEGMENT_FINISHED{}}}),
>  *** Segment 1 ({{{}COPY_SEGMENT_FINISHED{}}})
>  ** An exception is raised during the copy operation 
> ([{{copyLogSegmentData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java#L93]
>  in RemoteStorageManager) which is caught with the error message “{{Error 
> occurred while copying log segments of partition}}” and no further copy will 
> be attempted for the duration of this RLMTask.
>  ** At that point the Segment will never move to {{COPY_SEGMENT_FINISHED}} 
> but will transition to {{DELETE_SEGMENT_STARTED}} eventually before being 
> cleaned up when the associated segment is deleted.
>  * 
> [{{cleanupExpiredRemoteLogSegments}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1122]
>  is then called
>  ** Retention size is computed in 
> [{{buildRetentionSizeData}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1296]
>  as the sum of all the segments size regardless of their state so computed 
> size of the topic is 1 (local) + 4 (remote)
>  ** Segment 1 as being the oldest will be dropped.
> At the second iteration after 
> [{{remote.log.manager.task.interval.ms}}|https://github.com/apache/kafka/blob/d0dfefbe6394276eb329b6ca998842a984add506/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java#L395]
>  (default: 30s), the same will happen. The RLMM will now have 2 x

[jira] [Commented] (KAFKA-17025) KAFKA-17025: Producer throws uncaught exception in the io thread.

2024-07-04 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863104#comment-17863104
 ] 

Hongshun Wang commented on KAFKA-17025:
---

[~gharris1727] , would you like to help me to review this pr?

> KAFKA-17025: Producer throws uncaught exception in the io thread.
> -
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
> while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log it 
> when an uncaught exception occurs? Why not exposing method to set 
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
> determine what to do with uncaught exception, no matter thrown it or just 
> ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16684) FetchResponse#responseData could return incorrect data

2024-07-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863101#comment-17863101
 ] 

黃竣陽 commented on KAFKA-16684:
-

Yes, I will take care about that comment

> FetchResponse#responseData could return incorrect data
> --
>
> Key: KAFKA-16684
> URL: https://issues.apache.org/jira/browse/KAFKA-16684
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> [https://github.com/apache/kafka/commit/2b8aff58b575c199ee8372e5689420c9d77357a5]
>  make it accept input to return "partial" data. The content of output is 
> based on the input but we cache the output ... It will return same output 
> even though we pass different input. That is a potential bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17059) Remove `dynamicConfigOverride` from KafkaConfig

2024-07-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17059.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Remove `dynamicConfigOverride` from KafkaConfig
> ---
>
> Key: KAFKA-17059
> URL: https://issues.apache.org/jira/browse/KAFKA-17059
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.9.0
>
>
> It seems the field is never defined now, so we can remove it to simplify 
> constructor of KafkaConfig



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17082) Rewrite `LogCaptureAppender` by java

2024-07-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17082:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Rewrite `LogCaptureAppender` by java
> 
>
> Key: KAFKA-17082
> URL: https://issues.apache.org/jira/browse/KAFKA-17082
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> `LogCaptureAppender` can be used to verify the logger output, and it is 
> useful to test kafka tools in the future. Hence, we should rewrite it by java 
> to make tools happy :)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >