[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831631#comment-17831631
 ] 

Manikumar commented on KAFKA-16310:
---

Thanks, I have reverted  KAFKA-16341 and KAFKA-16342. commits from 3.6 branch

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16342) Fix compressed records

2024-03-27 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16342:
--
Fix Version/s: (was: 3.6.2)

> Fix compressed records
> --
>
> Key: KAFKA-16342
> URL: https://issues.apache.org/jira/browse/KAFKA-16342
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16341) Fix un-compressed records

2024-03-27 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16341:
--
Fix Version/s: (was: 3.6.2)

> Fix un-compressed records
> -
>
> Key: KAFKA-16341
> URL: https://issues.apache.org/jira/browse/KAFKA-16341
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Luke Chen
>Assignee: Johnny Hsu
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reopened KAFKA-16310:
---

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-16310:
--
Fix Version/s: (was: 3.6.2)

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831629#comment-17831629
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


{quote}
Can we revert the relavent changes from 3.6 branch as this not regression in 
3.6.x releases and looks like complete fix requires few more changes. This is 
to unblock 3.6.2 release. 
{quote}

sure, please feel free to revert KAFKA-16341 and KAFKA-16342. Sorry for that 
incomplete fix and it blocks the 3.6.2 :(

I will update KIP-734 to remind users that  behavior of  maxTimestamp offset 
could be changed in the future release.



> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16211) Inconsistent config values in CreateTopicsResult and DescribeConfigsResult

2024-03-27 Thread Dung Ha (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dung Ha reassigned KAFKA-16211:
---

Assignee: Dung Ha

> Inconsistent config values in CreateTopicsResult and DescribeConfigsResult
> --
>
> Key: KAFKA-16211
> URL: https://issues.apache.org/jira/browse/KAFKA-16211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Gantigmaa Selenge
>Assignee: Dung Ha
>Priority: Minor
>
> When creating a topic in KRaft cluster, a config value returned in 
> CreateTopicsResult is different than what you get from describe topic 
> configs, if the config was set in broker.properties or controller.properties 
> or in both but with different values. 
>  
> For example, start a broker with `segment.bytes` set to 573741824 in the 
> properties file and then create a topic, the CreateTopicsResult contains:
> ConfigEntry(name=segment.bytes, value=1073741824, source=DEFAULT_CONFIG, 
> isSensitive=false, isReadOnly=false, synonyms=[], type=INT, 
> documentation=null)
>  because the controller was started without setting this config. 
> However when you describe configurations for the same topic, the config value 
> set by the broker is returned:
> Create topic configsConfigEntry(name=segment.bytes, value=573741824, 
> source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, 
> synonyms=[], type=null, documentation=null)
>  
> Vice versa, if the controller is started with this config set to a different 
> value, the create topic request returns the value set by the controller and 
> then when you describe the config for the same topic, you get the value set 
> by the broker. This makes it confusing to understand which value being is 
> used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831624#comment-17831624
 ] 

Manikumar commented on KAFKA-16310:
---

[~chia7712] [~showuon] Can we revert the relavent changes from 3.6 branch as 
this not regression in 3.6.x releases and looks fix requires few more changes. 
This is to unblock 3.6.2 release. 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831624#comment-17831624
 ] 

Manikumar edited comment on KAFKA-16310 at 3/28/24 5:15 AM:


[~chia7712] [~showuon] Can we revert the relavent changes from 3.6 branch as 
this not regression in 3.6.x releases and looks like complete fix requires few 
more changes. This is to unblock 3.6.2 release. 


was (Author: omkreddy):
[~chia7712] [~showuon] Can we revert the relavent changes from 3.6 branch as 
this not regression in 3.6.x releases and looks fix requires few more changes. 
This is to unblock 3.6.2 release. 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


kamalcph commented on PR #15463:
URL: https://github.com/apache/kafka/pull/15463#issuecomment-2024392191

   @johnnychhsu 
   
   Can you update the PR summary if it is ready for review? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16161) Avoid creating remote log metadata snapshot file in partition data directory.

2024-03-27 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831619#comment-17831619
 ] 

Kamal Chandraprakash commented on KAFKA-16161:
--

Not yet started. Will work on this task by this week.

> Avoid creating remote log metadata snapshot file in partition data directory.
> -
>
> Key: KAFKA-16161
> URL: https://issues.apache.org/jira/browse/KAFKA-16161
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Satish Duggana
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: KIP-405
>
> Avoid creating remote log metadata snapshot file in a partition data 
> directory. This can be added when the snapshots implementation related 
> functionality is enabled end to end. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2024-03-27 Thread Abhijeet Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831617#comment-17831617
 ] 

Abhijeet Kumar commented on KAFKA-15265:


I plan to raise the PRs in the next couple of weeks. I may need help in getting 
the changes reviewed once the PRs are ready.

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


sjhajharia commented on PR #15609:
URL: https://github.com/apache/kafka/pull/15609#issuecomment-2024377686

   Thank you @chia7712 @jolshan @soarez for the review and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16346: Fix flaky MetricsTest.testMetrics [kafka]

2024-03-27 Thread via GitHub


showuon commented on PR #15502:
URL: https://github.com/apache/kafka/pull/15502#issuecomment-2024338012

   @FrankYang0529 , do we have any update for this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16435) Add test for KAFKA-16428

2024-03-27 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831609#comment-17831609
 ] 

Luke Chen commented on KAFKA-16435:
---

Thanks [~brandboat] !

> Add test for KAFKA-16428
> 
>
> Key: KAFKA-16435
> URL: https://issues.apache.org/jira/browse/KAFKA-16435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Add a test for KAFKA-16428: Fix bug where config change notification znode 
> may not get created during migration #15608



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] fix: Align LogContext setup for core components [kafka]

2024-03-27 Thread via GitHub


github-actions[bot] commented on PR #14348:
URL: https://github.com/apache/kafka/pull/14348#issuecomment-2024335297

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16232: kafka hangs forever in the starting process if the authorizer future is not returned [kafka]

2024-03-27 Thread via GitHub


showuon merged PR #15549:
URL: https://github.com/apache/kafka/pull/15549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16232: kafka hangs forever in the starting process if the authorizer future is not returned [kafka]

2024-03-27 Thread via GitHub


showuon commented on PR #15549:
URL: https://github.com/apache/kafka/pull/15549#issuecomment-2024290120

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


showuon commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1542235961


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   Looks like it didn't appear in the rebuild. Let me re-trigger the CI again.
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15463/13/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-27 Thread via GitHub


Owen-CH-Leung commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2024287098

   > printUsageAndExit
   
   Agree. Setting a dumb exit procedure solves the failed build. Let me revise 
that 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16435) Add test for KAFKA-16428

2024-03-27 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831605#comment-17831605
 ] 

Kuan Po Tseng commented on KAFKA-16435:
---

I'm willing to take over this ! Let me add some test~ Huge thanks!

> Add test for KAFKA-16428
> 
>
> Key: KAFKA-16435
> URL: https://issues.apache.org/jira/browse/KAFKA-16435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Major
>
> Add a test for KAFKA-16428: Fix bug where config change notification znode 
> may not get created during migration #15608



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16435) Add test for KAFKA-16428

2024-03-27 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng reassigned KAFKA-16435:
-

Assignee: Kuan Po Tseng

> Add test for KAFKA-16428
> 
>
> Key: KAFKA-16435
> URL: https://issues.apache.org/jira/browse/KAFKA-16435
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Add a test for KAFKA-16428: Fix bug where config change notification znode 
> may not get created during migration #15608



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: enhance kafka-reassign-partitions command output [kafka]

2024-03-27 Thread via GitHub


KevinZTW commented on code in PR #15610:
URL: https://github.com/apache/kafka/pull/15610#discussion_r1542216359


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -806,12 +806,10 @@ private static void executeMoves(Admin adminClient,
 do {
 Set completed = 
alterReplicaLogDirs(adminClient, pendingReplicas);
 if (!completed.isEmpty()) {
-System.out.printf("Successfully started log directory move%s 
for: %s%n",
-completed.size() == 1 ? "" : "s",
-completed.stream()
-
.sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas)
-.map(Object::toString)
-.collect(Collectors.joining(",")));
+
completed.stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).forEach(replica
 -> {
+System.out.printf("Successfully started moving log 
directory to %s for replica %s-%s with broker id: %s %n",

Review Comment:
   Thank you for the suggestion!  let me revise this



##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -1485,6 +1483,7 @@ static Set 
alterReplicaLogDirs(Admin adminClient,
 for (Entry> e : 
values.entrySet()) {
 TopicPartitionReplica replica = e.getKey();
 KafkaFuture future = e.getValue();
+

Review Comment:
   thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enhance kafka-reassign-partitions command output [kafka]

2024-03-27 Thread via GitHub


showuon commented on code in PR #15610:
URL: https://github.com/apache/kafka/pull/15610#discussion_r1542188367


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -1485,6 +1483,7 @@ static Set 
alterReplicaLogDirs(Admin adminClient,
 for (Entry> e : 
values.entrySet()) {
 TopicPartitionReplica replica = e.getKey();
 KafkaFuture future = e.getValue();
+

Review Comment:
   nit: additional line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-03-27 Thread via GitHub


apoorvmittal10 commented on code in PR #15532:
URL: https://github.com/apache/kafka/pull/15532#discussion_r1542173327


##
build.gradle:
##
@@ -1435,10 +1454,10 @@ project(':clients') {
 implementation libs.opentelemetryProto
 
 // libraries which should be added as runtime dependencies in generated 
pom.xml should be defined here:
-shadow libs.zstd
-shadow libs.lz4
-shadow libs.snappy
-shadow libs.slf4jApi
+shadowed libs.zstd
+shadowed libs.lz4
+shadowed libs.snappy
+shadowed libs.slf4jApi

Review Comment:
   Thanks for the suggestion, I ll give it a try next week.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16437) Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)

2024-03-27 Thread Christopher L. Shannon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher L. Shannon updated KAFKA-16437:
---
Description: 
Jira to track 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1032%3A+Upgrade+to+Jakarta+and+JavaEE+9+in+Kafka+4.0]

 

There is an old PR that could be the basis of the work: 
https://github.com/apache/kafka/pull/10176

  was:
Jira to track 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1032%3A+Upgrade+to+Jakarta+and+JavaEE+9+in+Kafka+4.0]

 

 


> Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)
> ---
>
> Key: KAFKA-16437
> URL: https://issues.apache.org/jira/browse/KAFKA-16437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christopher L. Shannon
>Priority: Major
>
> Jira to track 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1032%3A+Upgrade+to+Jakarta+and+JavaEE+9+in+Kafka+4.0]
>  
> There is an old PR that could be the basis of the work: 
> https://github.com/apache/kafka/pull/10176



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16437) Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)

2024-03-27 Thread Christopher L. Shannon (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher L. Shannon updated KAFKA-16437:
---
Description: 
Jira to track 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1032%3A+Upgrade+to+Jakarta+and+JavaEE+9+in+Kafka+4.0]

 

 

> Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)
> ---
>
> Key: KAFKA-16437
> URL: https://issues.apache.org/jira/browse/KAFKA-16437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christopher L. Shannon
>Priority: Major
>
> Jira to track 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1032%3A+Upgrade+to+Jakarta+and+JavaEE+9+in+Kafka+4.0]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16437) Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)

2024-03-27 Thread Christopher L. Shannon (Jira)
Christopher L. Shannon created KAFKA-16437:
--

 Summary: Upgrade to Jakarta and JavaEE 9 in Kafka 4.0 (KIP-1032)
 Key: KAFKA-16437
 URL: https://issues.apache.org/jira/browse/KAFKA-16437
 Project: Kafka
  Issue Type: Improvement
Reporter: Christopher L. Shannon






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]

2024-03-27 Thread via GitHub


jolshan commented on PR #15303:
URL: https://github.com/apache/kafka/pull/15303#issuecomment-2024062457

   Closing since I will repurpose this for the Transaction Record change and we 
have https://issues.apache.org/jira/browse/KAFKA-16308 for the general changes 
to support new feature commands


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-16192: Introduce usage of flexible records to coordinators [kafka]

2024-03-27 Thread via GitHub


jolshan closed pull request #15303: [WIP] KAFKA-16192: Introduce usage of 
flexible records to coordinators
URL: https://github.com/apache/kafka/pull/15303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: enhance kafka-reassign-partitions command output [kafka]

2024-03-27 Thread via GitHub


AndrewJSchofield commented on code in PR #15610:
URL: https://github.com/apache/kafka/pull/15610#discussion_r1542090972


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -806,12 +806,10 @@ private static void executeMoves(Admin adminClient,
 do {
 Set completed = 
alterReplicaLogDirs(adminClient, pendingReplicas);
 if (!completed.isEmpty()) {
-System.out.printf("Successfully started log directory move%s 
for: %s%n",
-completed.size() == 1 ? "" : "s",
-completed.stream()
-
.sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas)
-.map(Object::toString)
-.collect(Collectors.joining(",")));
+
completed.stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).forEach(replica
 -> {
+System.out.printf("Successfully started moving log 
directory to %s for replica %s-%s with broker id: %s %n",

Review Comment:
   This seems like a much nicer way to format the output. I suggest that `... 
with broker %s%n` would be a bit neater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove redundant ApiVersionsResponse#filterApis [kafka]

2024-03-27 Thread via GitHub


AndrewJSchofield commented on PR #15611:
URL: https://github.com/apache/kafka/pull/15611#issuecomment-2024044460

   I'm not sure this is really an improvement. It's just an overloading method 
which has one existing caller.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-27 Thread via GitHub


AndrewJSchofield commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1542082131


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##
@@ -204,16 +202,14 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
 ByteBuffer data = ByteBuffer.wrap(metrics);
 try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
-ByteArrayOutputStream out = new ByteArrayOutputStream()) {
-
+ ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {

Review Comment:
   One tiny, tiny comment. The indentation of this line is out by 1 space. 
Apart from that, lgtm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]

2024-03-27 Thread via GitHub


dongnuo123 commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1542076157


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupProtocolMigrationPolicy.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupProtocolMigrationPolicy {
+/** Both upgrade and downgrade are enabled.*/
+BOTH("both"),

Review Comment:
   Hmm I can't think of a better name either... Maybe `bidirectional`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831553#comment-17831553
 ] 

Jun Rao commented on KAFKA-16310:
-

[~johnnyhsu] , [~showuon] and [~chia7712] : Sorry, but I just realized one 
issue with the fix. The problem is that we only fixed offsetForMaxTimestamp 
during leader append. The follower append still uses the lastOffset in the 
batch. 

 
{code:java}
UnifiedLog.analyzeAndValidateRecords()

lastOffset = batch.lastOffset

...

if (batch.maxTimestamp > maxTimestamp) {
  maxTimestamp = batch.maxTimestamp
  offsetOfMaxTimestamp = lastOffset
} {code}
We optimize the follower code to avoid decompressing a batch. So, it's kind of 
hard to get the exact record offset for maxTimestamp in the batch.

 

I think the easiest way to fix the listMaxTimestamp issue is probably to still 
maintain offsetOfMaxTimestamp at the record batch level so that it can be 
derived consistently at both the leader and the follower. When serving the 
listMaxTimestamp request, we iterate the batch containing the maxTimestamp to 
find the exact record offset with maxTimestamp. Since this is a rare operation, 
paying the decompression overhead is fine. What do you think?

 

If we want to do the above, we probably need to revert the changes in 3.6.2, 
which is being voted now. cc [~omkreddy] 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16436) Online upgrade triggering and group type conversion

2024-03-27 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16436:
---

 Summary: Online upgrade triggering and group type conversion 
 Key: KAFKA-16436
 URL: https://issues.apache.org/jira/browse/KAFKA-16436
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [Draft] Online migration triggering [kafka]

2024-03-27 Thread via GitHub


dongnuo123 commented on code in PR #15593:
URL: https://github.com/apache/kafka/pull/15593#discussion_r1542011267


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -760,6 +776,28 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+return 
GroupProtocolMigrationPolicy.isUpgradeEnabled(groupProtocolMigrationPolicy) &&
+!classicGroup.isInState(DEAD) &&
+
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+classicGroup.size() <= consumerGroupMaxSize;
+}
+
+ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List records) {
+classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+createGroupTombstoneRecords(classicGroup, records);
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());
+
+// Manually trigger a rebalance.

Review Comment:
   Maybe it's not necessary to trigger the rebalance here. If the consumer 
restarts and joins the group as a new member, the group epoch will always be 
bumped in consumerGroupHeartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Adding new coordinator related changes for connect_distributed.py [kafka]

2024-03-27 Thread via GitHub


philipnee commented on code in PR #15594:
URL: https://github.com/apache/kafka/pull/15594#discussion_r1541985614


##
tests/kafkatest/services/connect.py:
##
@@ -534,33 +535,40 @@ def received_messages(self):
 
 def start(self):
 self.logger.info("Creating connector VerifiableSinkConnector %s", 
self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.VerifiableSinkConnector',
 'tasks.max': self.tasks,
 'topics': ",".join(self.topics)
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol
+self.cc.create_connector(connector_config)
 
 class MockSink(object):
 
-def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink"):
+def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-sink", 
consumer_group_protocol=None):
 self.cc = cc
 self.logger = self.cc.logger
 self.name = name
 self.mode = mode
 self.delay_sec = delay_sec
 self.topics = topics
+self.consumer_group_protocol = consumer_group_protocol
 
 def start(self):
 self.logger.info("Creating connector MockSinkConnector %s", self.name)
-self.cc.create_connector({
+connector_config = {
 'name': self.name,
 'connector.class': 
'org.apache.kafka.connect.tools.MockSinkConnector',
 'tasks.max': 1,
 'topics': ",".join(self.topics),
 'mock_mode': self.mode,
 'delay_ms': self.delay_sec * 1000
-})
+}
+if self.consumer_group_protocol is not None:
+connector_config["consumer.override.group.protocol"] = 
self.consumer_group_protocol

Review Comment:
   thanks for the explanation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


gharris1727 commented on PR #15597:
URL: https://github.com/apache/kafka/pull/15597#issuecomment-2023838245

   Hi @mimaison @C0urante PTAL at the latest WorkerConfig change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16411) Correctly migrate default client quota entities in KRaft migration

2024-03-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-16411.
--
Resolution: Fixed

> Correctly migrate default client quota entities in KRaft migration
> --
>
> Key: KAFKA-16411
> URL: https://issues.apache.org/jira/browse/KAFKA-16411
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16428) Fix bug where config change notification znode may not get created during migration

2024-03-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-16428.
--
Resolution: Fixed

> Fix bug where config change notification znode may not get created during 
> migration
> ---
>
> Key: KAFKA-16428
> URL: https://issues.apache.org/jira/browse/KAFKA-16428
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16435) Add test for KAFKA-16428

2024-03-27 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-16435:


 Summary: Add test for KAFKA-16428
 Key: KAFKA-16435
 URL: https://issues.apache.org/jira/browse/KAFKA-16435
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe


Add a test for KAFKA-16428: Fix bug where config change notification znode may 
not get created during migration #15608



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16428: Fix bug where config change notification znode may not get created during migration [kafka]

2024-03-27 Thread via GitHub


cmccabe merged PR #15608:
URL: https://github.com/apache/kafka/pull/15608


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16428: Fix bug where config change notification znode may not get created during migration [kafka]

2024-03-27 Thread via GitHub


cmccabe commented on PR #15608:
URL: https://github.com/apache/kafka/pull/15608#issuecomment-2023799788

   Yeah, I need to add a test. But I really want this fix to make 3.6.2. I 
filed a follow-up here: https://issues.apache.org/jira/browse/KAFKA-16435


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
>  
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by 

[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian 

[jira] [Created] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16434:
---

 Summary: ForeignKey INNER join does not unset join result when FK 
becomes null
 Key: KAFKA-16434
 URL: https://issues.apache.org/jira/browse/KAFKA-16434
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0, 2.8.2
Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


chia7712 merged PR #15609:
URL: https://github.com/apache/kafka/pull/15609


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on PR #15609:
URL: https://github.com/apache/kafka/pull/15609#issuecomment-2023772318

   the failed tests pass on my machine, so I'm going to merge it.
   ```sh
   ./gradlew cleanTest :streams:test --tests 
EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
 :tools:test --tests 
MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful --tests 
MetadataQuorumCommandTest.testDescribeQuorumStatusSuccessful :storage:test 
--tests TransactionsWithTieredStoreTest.testBumpTransactionalEpoch --tests 
TransactionsWithTieredStoreTest.testAbortTransactionTimeout :metadata:test 
--tests QuorumControllerTest.testFenceMultipleBrokers --tests 
QuorumControllerTest.testBalancePartitionLeaders :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault --tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigs 
--tests MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs 
:core:test --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testNoConsumeWithDescribeAclViaSubscribe
 --t
 ests PlaintextAdminIntegrationTest.testAlterReplicaLogDirs --tests 
DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails
 --tests ConsumerBounceTest.testConsumptionWithBrokerFailures --tests 
ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests 
LogDirFailureTest.testIOExceptionDuringLogRoll --tests 
LogDirFailureTest.testIOExceptionDuringCheckpoint --tests 
DynamicBrokerReconfigurationTest.testTrustStoreAlter :clients:test --tests 
KafkaConsumerTest.testWakeupWithFetchDataAvailable
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-27 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831501#comment-17831501
 ] 

Ayoub Omari commented on KAFKA-16407:
-

Actually this behavior is not concerned by this KIP, it's a condition that 
existed since 2.8 which wrongly assumes if old foreign key is null then we 
should skip the current record. From what I saw, the implementation of the KIP 
didn't change that code.

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15713) KRaft support in SaslClientsWithInvalidCredentialsTest

2024-03-27 Thread Pavel Pozdeev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831494#comment-17831494
 ] 

Pavel Pozdeev commented on KAFKA-15713:
---

[~johndoe]  now I've got access to assign tickets, assigned it to myself =)

> KRaft support in SaslClientsWithInvalidCredentialsTest
> --
>
> Key: KAFKA-15713
> URL: https://issues.apache.org/jira/browse/KAFKA-15713
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Pavel Pozdeev
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in SaslClientsWithInvalidCredentialsTest in 
> core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
>  need to be updated to support KRaft
> 125 : def testAclCliWithAuthorizer(): Unit = {
> 130 : def testAclCliWithAdminAPI(): Unit = {
> 186 : def testProducerConsumerCliWithAuthorizer(): Unit = {
> 191 : def testProducerConsumerCliWithAdminAPI(): Unit = {
> 197 : def testAclCliWithClientId(): Unit = {
> 236 : def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
> 241 : def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
> 268 : def testInvalidAuthorizerProperty(): Unit = {
> 276 : def testPatternTypes(): Unit = {
> Scanned 336 lines. Found 0 KRaft tests out of 9 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15713) KRaft support in SaslClientsWithInvalidCredentialsTest

2024-03-27 Thread Pavel Pozdeev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Pozdeev reassigned KAFKA-15713:
-

Assignee: Pavel Pozdeev

> KRaft support in SaslClientsWithInvalidCredentialsTest
> --
>
> Key: KAFKA-15713
> URL: https://issues.apache.org/jira/browse/KAFKA-15713
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Pavel Pozdeev
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in SaslClientsWithInvalidCredentialsTest in 
> core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
>  need to be updated to support KRaft
> 125 : def testAclCliWithAuthorizer(): Unit = {
> 130 : def testAclCliWithAdminAPI(): Unit = {
> 186 : def testProducerConsumerCliWithAuthorizer(): Unit = {
> 191 : def testProducerConsumerCliWithAdminAPI(): Unit = {
> 197 : def testAclCliWithClientId(): Unit = {
> 236 : def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
> 241 : def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
> 268 : def testInvalidAuthorizerProperty(): Unit = {
> 276 : def testPatternTypes(): Unit = {
> Scanned 336 lines. Found 0 KRaft tests out of 9 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Priority: Blocker  (was: Major)

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, regex
> Fix For: 4.0.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15561) Client support for new SubscriptionPattern based subscription

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15561:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Client support for new SubscriptionPattern based subscription
> -
>
> Key: KAFKA-15561
> URL: https://issues.apache.org/jira/browse/KAFKA-15561
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: kip-848-client-support, regex
> Fix For: 4.0.0
>
>
> New consumer should support subscribe with the new SubscriptionPattern 
> introduced in the new consumer group protocol. When subscribing with this 
> regex, the client should provide the regex in the HB request on the 
> SubscribedTopicRegex field, delegating the resolution to the server.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15697) Add local assignor and ensure it cannot be used with server side assignor

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15697:
--
Fix Version/s: (was: 4.0.0)

> Add local assignor and ensure it cannot be used with server side assignor
> -
>
> Key: KAFKA-15697
> URL: https://issues.apache.org/jira/browse/KAFKA-15697
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Major
>  Labels: kip-848-client-support
>
> When we start supporting local/client-side assignor, we should:
>  # Add the config to ConsumerConfig
>  # Examine where should we implement to logic to ensure it is not used along 
> side with the server side assignor, i.e. you can only specify local or remote 
> assignor, or non.
>  ## If both assignors are specified: Throw illegalArgumentException



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16109:
--
Summary: Write system tests cover the "simple consumer + commit" use case  
(was: Ensure system tests cover the "simple consumer + commit" use case)

> Write system tests cover the "simple consumer + commit" use case
> 
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15283:
--
Priority: Critical  (was: Major)

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support, newbie, offset
> Fix For: 4.0.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15279) Implement client support for KIP-848 client-side assigner RPCs

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15279:
--
Fix Version/s: (was: 4.0.0)

> Implement client support for KIP-848 client-side assigner RPCs
> --
>
> Key: KAFKA-15279
> URL: https://issues.apache.org/jira/browse/KAFKA-15279
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
>  # 
> [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]
>  # 
> [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]
> Support for ConsumerGroupHeartbeat is handled by KAFKA-15278. This task is to 
> implement the ConsumerGroupAssignmentRequestManager to handle the second and 
> third RPCs on the above list.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15284:
--
Fix Version/s: (was: 4.0.0)

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16110) Implement consumer performance tests

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16110:
--
Priority: Blocker  (was: Major)

> Implement consumer performance tests
> 
>
> Key: KAFKA-16110
> URL: https://issues.apache.org/jira/browse/KAFKA-16110
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, performance-benchmark
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15282) Implement client support for KIP-848 client-side assignors

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15282:
--
Fix Version/s: (was: 4.0.0)

> Implement client support for KIP-848 client-side assignors
> --
>
> Key: KAFKA-15282
> URL: https://issues.apache.org/jira/browse/KAFKA-15282
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> The client-side assignor provides the logic for the partition assignments 
> instead of on the server. Client-side assignment is the main approach used by 
> the “old protocol” for divvying up partitions. While the “new protocol” 
> favors server-side assignors, the client-side assignor will continue to be 
> used for backward compatibility, including KSQL, Connect, etc.
> Note: I _*think*_ that the client-side assignor logic and the reconciliation 
> logic can remain separate from each other. We should strive to keep the two 
> pieces unencumbered, unless it’s unavoidable.
> This task includes:
>  * Validate the client’s configuration for assignor selection
>  * Integrate with the new {{PartitionAssignor}} interface to invoke the logic 
> from the user-provided assignor implementation
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupPrepareAssignment}} RPC call using the information from the 
> {{PartitionAssignor}} above
>  * Implement the necessary logic around the request/response from the 
> {{ConsumerGroupInstallAssignment}} RPC call, again using the information 
> calculated by the {{PartitionAssignor}}
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16143) New metrics for KIP-848 protocol

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16143:
--
Priority: Minor  (was: Major)

> New metrics for KIP-848 protocol
> 
>
> Key: KAFKA-16143
> URL: https://issues.apache.org/jira/browse/KAFKA-16143
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, metrics
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Minor
>  Labels: kip-848-client-support, metrics, needs-kip
> Fix For: 4.0.0
>
>
> This task is to consider what _new_ metrics we need from the KIP-848 protocol 
> that aren't already exposed by the current set of metrics. This will require 
> a KIP to introduce the new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16397 - Use ByteBufferOutputStream to avoid array copy [kafka]

2024-03-27 Thread via GitHub


apoorvmittal10 commented on code in PR #15589:
URL: https://github.com/apache/kafka/pull/15589#discussion_r1541573651


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtilsTest.java:
##
@@ -132,9 +133,9 @@ public void testCompressDecompress(CompressionType 
compressionType) throws IOExc
 } else {
 assertArrayEquals(testString, compressed);
 }
-
 ByteBuffer decompressed = ClientTelemetryUtils.decompress(compressed, 
compressionType);
+byte[] actualResult = Utils.toArray(decompressed);

Review Comment:
   Can we please move the conversion after the `assertNotNull(decompressed);` 
as there is a usage of `decompressed` inside the Utils.toArray method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16310.

Resolution: Fixed

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16359: Corrected manifest file for kafka-clients [kafka]

2024-03-27 Thread via GitHub


mimaison commented on code in PR #15532:
URL: https://github.com/apache/kafka/pull/15532#discussion_r1541600741


##
build.gradle:
##
@@ -1435,10 +1454,10 @@ project(':clients') {
 implementation libs.opentelemetryProto
 
 // libraries which should be added as runtime dependencies in generated 
pom.xml should be defined here:
-shadow libs.zstd
-shadow libs.lz4
-shadow libs.snappy
-shadow libs.slf4jApi
+shadowed libs.zstd
+shadowed libs.lz4
+shadowed libs.snappy
+shadowed libs.slf4jApi

Review Comment:
   I wonder if below in `shadowJar`, we should do something like:
   ```
   // dependencies excluded from the final jar, since they are declared as 
runtime dependencies
   dependencies {
 project.configurations.shadowed.allDependencies.each {
   exclude(dependency(it.group + ':' + it.name))
 }
 // exclude proto files from the jar
 exclude "**/opentelemetry/proto/**/*.proto"
 exclude "**/google/protobuf/*.proto"
   }
   ```
   
   Instead of what we currently have where we again list these dependencies:
   ```
   exclude(dependency(libs.snappy))
   exclude(dependency(libs.zstd))
   exclude(dependency(libs.lz4))
   exclude(dependency(libs.slf4jApi))
   ```
   This would avoid having these diverge in the future. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16433:
--
Labels: consumer-threading-refactor timeout  (was: 
consumer-threading-refactor)

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16433:
--
Issue Type: Bug  (was: Task)

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16433:
--
Component/s: clients

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16433:
--
Fix Version/s: 3.8.0

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.8.0
>
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831468#comment-17831468
 ] 

Matthias J. Sax commented on KAFKA-16407:
-

Did 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 not fix this?

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


ijuma commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541513692


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   You can then call `toSet`. But `CollectionConverters` means you don't have 
to use deprecated APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541508048


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   Hello @ijuma 
   
   Sorry, I didn't find a way to use `CollectionConverters` to convert java set 
to immutable scala set.
   Can you give me an example? 
   
   `AclEntry.supportedOperations(ResourceType.CLUSTER).asScala` which uses 
`CollectionConverters` returns `scala.collection.mutable.Set` while here we use 
`scala.collection.immutable.Set`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831464#comment-17831464
 ] 

Johnny Hsu commented on KAFKA-16310:


{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! 


{quote}

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831464#comment-17831464
 ] 

Johnny Hsu edited comment on KAFKA-16310 at 3/27/24 5:01 PM:
-

{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! {quote}


was (Author: JIRAUSER304478):
{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! 


{quote}

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16433:
---
Labels: consumer-threading-refactor  (was: )

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16433:
--

 Summary: beginningOffsets and offsetsForTimes don't behave 
consistently when providing a zero timeout
 Key: KAFKA-16433
 URL: https://issues.apache.org/jira/browse/KAFKA-16433
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


As documented here:[https://github.com/apache/kafka/pull/15525]

 

Both API should at least send out a request when zero timeout is provided.

 

This is corrected in the PR above.  We however still to fix the implementation 
for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831463#comment-17831463
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


@johnnyhsu thanks for offers the description. I add following statement to 
KIP-734 according to your comments.

{quote}

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

{quote}

WDYT?

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541492785


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null 

Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2023293507

   @lucasbru - If I'm not mistaken, the current implementation for both 
beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon 
getting ZERO duration.  Seems like both code paths are invoking this logic
   ```
   // if timeout is set to zero, do not try to poll the network 
client at all
   // and return empty immediately; otherwise try to get the 
results synchronously
   // and throw timeout exception if it cannot complete in time
   if (timer.timeoutMs() == 0L)
   return result;
   ```
   
   But the offsets for time seems to shortcircuit it here:
   ```
   // If timeout is set to zero return empty immediately; otherwise 
try to get the results
   // and throw timeout exception if it cannot complete in time.
   if (timeout.toMillis() == 0L)
   return listOffsetsEvent.emptyResult();
   
   return applicationEventHandler.addAndGet(listOffsetsEvent, 
timer);
   ```
   
   I'll create a ticket to align the behavior of these two APIs in the new 
consumers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


ijuma commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541483681


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   You can use CollectionConverters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831460#comment-17831460
 ] 

Johnny Hsu commented on KAFKA-16310:


The update is in below section:

 

## When the TimestampType is LOG_APPEND_TIME

When the TimestampType is LOG_APPEND_TIME, the timestamp of the records are the 
same. In this case, we should choose the offset of the first record. [This 
path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L294]
 in LogValidator was added to handle this case for non-compressed type, while 
[this 
path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L421]
 in LogValidator was added to handle this case for compressed type.  

I don't have the Confluence account yet, [~chia7712] would you please help 
update the KIP in the wiki? I will send this update to the dev thread for 
visibility. Thanks! 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541475816


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   AFAIK `asScala` not exists in scala-2.12 which we use to build kafka.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541472627


##
server/src/main/java/org/apache/kafka/security/CredentialProvider.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+
+import java.util.Collection;
+import java.util.Properties;
+
+public class CredentialProvider {
+private final Collection scramMechanisms;

Review Comment:
   Nice catch. Thanks. Field removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541471761


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null 

Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]

2024-03-27 Thread via GitHub


lianetm commented on code in PR #15612:
URL: https://github.com/apache/kafka/pull/15612#discussion_r1541452355


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -0,0 +1,320 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test.MockConsumerInterceptor
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+
+/**
+ * Integration tests for the consumer that covers the logic related to 
committing offsets.
+ */
+@Timeout(600)
+class PlaintextConsumerCommitTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+
+// wakeup the consumer before closing to simulate trying to break a poll
+// loop from another thread
+consumer.wakeup()
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+val consumer = createConsumer()
+consumer.assign(List(tp).asJava)
+
+// sync commit
+val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+consumer.commitSync(Map((tp, syncMetadata)).asJava)
+assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// async commit
+val asyncMetadata = new OffsetAndMetadata(10, "bar")
+sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// handle null metadata
+val nullMetadata = new OffsetAndMetadata(5, null)
+consumer.commitSync(Map(tp -> nullMetadata).asJava)
+

Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


mimaison commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2023212508

   Now that https://github.com/apache/kafka/pull/15075 got merged, this needs 
rebasing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-03-27 Thread via GitHub


clolov commented on PR #14716:
URL: https://github.com/apache/kafka/pull/14716#issuecomment-2023192543

   Hello @cadonna! This ought to be one of the last PRs for the migration. I 
will circle back tomorrow morning to rebase it since it has been out for a long 
time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-27 Thread via GitHub


CalvinConfluent commented on PR #15541:
URL: https://github.com/apache/kafka/pull/15541#issuecomment-2023173614

   @kirktrue @jolshan Anything else we need to address for this ticket?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2024-03-27 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831438#comment-17831438
 ] 

Henry Cai commented on KAFKA-15265:
---

Thanks.  [~abhijeetkumar] Do you have a rough timeline when the PR will be 
ready?  Do you need other people to contribute to the work?

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


gharris1727 commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541420383


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -580,8 +601,15 @@ private Map 
instantiateConfigProviders(Map

Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1541419703


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   ah i see. thanks for the comment @showuon ! 
   let me address that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541382798


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null 

Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   i just removed the maybe request next block and it required peeking into it, 
but since it's gone, no need to have the counters either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   i just removed the maybe request next block and it required peeking into it, 
but since it's gone, no need to have the counters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-27 Thread via GitHub


lucasbru opened a new pull request, #15613:
URL: https://github.com/apache/kafka/pull/15613

   The javadoc for `KafkaConsumer.commitSync` says:
   
   > Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
   > (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
   
   This is not always true, neither for the legacy consumer nor for the async
   consumer. This change proposed a number of fixes related to this
   guarantee:
   
 - In the legacy consumer, we're also awaiting async commits that are
   "pending" instead of "in-flight", because we do not know the
   coordinator yet.
 - In the new consumer, we keep track of the incomplete async commit
   futures and wait for them to complete before returning from 
   `commitSync`.
 - Since we need to block to make sure that our previous commits are 
   completed, we allow the consumer to wake up. This is implemented
   but we are leaving it unresolved in the legacy consumer.
   
   ## Testing
   
   A new integration test
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541378249


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }

Review Comment:
   as I'm simplifying the whole stuff can be simplified as well :D thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2023109755

   @Owen-CH-Leung The `printUsageAndExit` call `exit(1)` so `KRAFT` and 
`CO_KRAFT` will stop the JVM. `ZK` can capture the exit code to throw exception 
so it does not terminate the JVM.
   
   Hence, a simple solution is - set dumb exit procedure for `testPrintHelp`. 
For example:
   ```java
   @ClusterTest
   public void testPrintHelp() {
   Exit.setExitProcedure((statusCode, message) -> { });
   try {
   String out = ToolsTestUtils.captureStandardErr(() -> 
GetOffsetShell.mainNoExit("--help"));
   assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT));
   } finally {
   Exit.resetExitProcedure();
   }
   }
   ``` 
   WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


soarez commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541375369


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   Should the error be immediately dequeued once it's taken?



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }

Review Comment:
   Do we need an error count / `repeat`? It seems the tests don't make use of 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]

2024-03-27 Thread via GitHub


lianetm commented on PR #15612:
URL: https://github.com/apache/kafka/pull/15612#issuecomment-2023088189

   @lucasbru this is one more last split that I find would make sense. With 
this, all the tests left in the `PlainTextConsumer` do not seem to belong to 
any sensible group other than the generic/misc one seen as the PlainText 
itself. Could you take a look if you have a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541357369


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+val INDEFINITE: Int = -1
+
+def indefinitely(error: Errors): ErrorCount = {
+  ErrorCount(error, INDEFINITE)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  while (queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error
+}
+
+def peekError(): Errors = queue.synchronized {
+  queue.head.error
+}
+
+def clearProcessedError(): Unit = {
+  TestUtils.waitUntilTrue(() =>
+queue.synchronized {
+  queue.head.repeat == 0
+}, "error wasn't processed")
+  queue.synchronized {
+queue.dequeue()

Review Comment:
   no, but doesn't matter any more. I was able to get rid of the 
maybeRequestNextBlock override completely and the whole "mocking" became much 
more easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >