[jira] [Resolved] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-06-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-16541.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

Merged the PR to trunk.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
> Fix For: 4.0.0
>
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-05-01 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842783#comment-17842783
 ] 

Jun Rao commented on KAFKA-16541:
-

[~ocadaruma] : Will you be able to work on this soon? The 3.8.0 code freeze is 
getting close. Thanks.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836717#comment-17836717
 ] 

Jun Rao edited comment on KAFKA-16541 at 4/12/24 5:43 PM:
--

Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change LeaderEpochFileCache.truncateFromEnd and 
LeaderEpochFileCache.truncateFromStart to only write to memory without writing 
to the checkpoint file, (2) change the implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.


was (Author: junrao):
Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change
LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart 
to only write to memory without writing to the checkpoint file, (2) change the 
implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836717#comment-17836717
 ] 

Jun Rao commented on KAFKA-16541:
-

Thanks for filing the jira, [~ocadaruma] !  Since this is a regression, it 
would be useful to have this fixed in 3.8.0 and 3.7.1.

One way to fix it is to (1) change
LeaderEpochFileCache.truncateFromEnd and LeaderEpochFileCache.truncateFromStart 
to only write to memory without writing to the checkpoint file, (2) change the 
implementation of 
[renamDir|https://github.com/apache/kafka/blob/3.6.0/core/src/main/scala/kafka/log/UnifiedLog.scala#L681]
 so that it doesn't reinitialize from the file and just change the Path of the 
backing CheckpointFile.

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16541) Potential leader epoch checkpoint file corruption on OS crash

2024-04-12 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-16541:

Affects Version/s: 3.7.0

> Potential leader epoch checkpoint file corruption on OS crash
> -
>
> Key: KAFKA-16541
> URL: https://issues.apache.org/jira/browse/KAFKA-16541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Minor
>
> Pointed out by [~junrao] on 
> [GitHub|https://github.com/apache/kafka/pull/14242#discussion_r1556161125]
> [A patch for KAFKA-15046|https://github.com/apache/kafka/pull/14242] got rid 
> of fsync of leader-epoch ckeckpoint file in some path for performance reason.
> However, since now checkpoint file is flushed to the device asynchronously by 
> OS, content would corrupt if OS suddenly crashes (e.g. by power failure, 
> kernel panic) in the middle of flush.
> Corrupted checkpoint file could prevent Kafka broker to start-up



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16485) Fix broker metrics to follow kebab/hyphen case

2024-04-09 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-16485.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

Merged the PR to trunk.

> Fix broker metrics to follow kebab/hyphen case
> --
>
> Key: KAFKA-16485
> URL: https://issues.apache.org/jira/browse/KAFKA-16485
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831959#comment-17831959
 ] 

Jun Rao commented on KAFKA-16310:
-

Since the follower only maintains offsetForMaxTimestamp at the batch level, the 
listMaxTimestamp API was never implemented correctly. So, technically, there 
was no regression for listMaxTimestamp. We could just fix this issue in trunk 
without backporting to the old branch.

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831834#comment-17831834
 ] 

Jun Rao edited comment on KAFKA-16310 at 3/28/24 3:04 PM:
--

[~chia7712] :
{quote}Did you mean that we should change the schema to have a new field to 
keep `offsetOfMaxTimestamp`? or just add a new method to Batch to iterate all 
records to find out offsetOfMaxTimestamp?
{quote}
Adding a new field in the batch requires record format change, which is a much 
bigger effort. For now, the easiest thing is to add a method in Batch to find 
out offsetOfMaxTimestanp by iterating all records.

Regarding the optimization on the leader side by caching offsetOfMaxTimestanp, 
we could do it. However, my understanding is that listMaxTimestamp is rare and 
I am not sure if it's worth the additional complexity.

[~showuon] : Regarding the fix, we could fix forward if we could provide the 
fix soon. Otherwise, we probably want to revert the fixes to avoid new code 
depending on the new semantic since we changed shallowOffsetOfMaxTimestanp to 
offsetOfMaxTimestanp.


was (Author: junrao):
[~chia7712] :
{quote}Did you mean that we should change the schema to have a new field to 
keep `offsetOfMaxTimestamp`? or just add a new method to Batch to iterate all 
records to find out offsetOfMaxTimestamp?
{quote}
Adding a new field in the batch requires record format change, which is a much 
bigger effort. For now, the easiest thing is to add a method in Batch to find 
out offsetOfMaxTimestanp by iterating all records.

Regarding the optimization on the leader side by caching offsetOfMaxTimestanp, 
we could do it. However, my understanding is that listMaxTimestamp is rare and 
I am not sure if it's worth the additional complexity.

[~showuon] : Regarding the fix, we could fix forward if we could provide the 
fix soon. Otherwise, we probably want to revert the fixes to avoid new code 
depending on the new semantic.

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831834#comment-17831834
 ] 

Jun Rao commented on KAFKA-16310:
-

[~chia7712] :
{quote}Did you mean that we should change the schema to have a new field to 
keep `offsetOfMaxTimestamp`? or just add a new method to Batch to iterate all 
records to find out offsetOfMaxTimestamp?
{quote}
Adding a new field in the batch requires record format change, which is a much 
bigger effort. For now, the easiest thing is to add a method in Batch to find 
out offsetOfMaxTimestanp by iterating all records.

Regarding the optimization on the leader side by caching offsetOfMaxTimestanp, 
we could do it. However, my understanding is that listMaxTimestamp is rare and 
I am not sure if it's worth the additional complexity.

[~showuon] : Regarding the fix, we could fix forward if we could provide the 
fix soon. Otherwise, we probably want to revert the fixes to avoid new code 
depending on the new semantic.

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831553#comment-17831553
 ] 

Jun Rao commented on KAFKA-16310:
-

[~johnnyhsu] , [~showuon] and [~chia7712] : Sorry, but I just realized one 
issue with the fix. The problem is that we only fixed offsetForMaxTimestamp 
during leader append. The follower append still uses the lastOffset in the 
batch. 

 
{code:java}
UnifiedLog.analyzeAndValidateRecords()

lastOffset = batch.lastOffset

...

if (batch.maxTimestamp > maxTimestamp) {
  maxTimestamp = batch.maxTimestamp
  offsetOfMaxTimestamp = lastOffset
} {code}
We optimize the follower code to avoid decompressing a batch. So, it's kind of 
hard to get the exact record offset for maxTimestamp in the batch.

 

I think the easiest way to fix the listMaxTimestamp issue is probably to still 
maintain offsetOfMaxTimestamp at the record batch level so that it can be 
derived consistently at both the leader and the follower. When serving the 
listMaxTimestamp request, we iterate the batch containing the maxTimestamp to 
find the exact record offset with maxTimestamp. Since this is a rare operation, 
paying the decompression overhead is fine. What do you think?

 

If we want to do the above, we probably need to revert the changes in 3.6.2, 
which is being voted now. cc [~omkreddy] 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15950) Serialize broker heartbeat requests

2024-03-25 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15950.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

merged the PR to trunk.

> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.8.0
>
>
> This is a followup issue from the discussion in 
> [https://github.com/apache/kafka/pull/14836#discussion_r1409739363].
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16385) Segment is rolled before segment.ms or segment.bytes breached

2024-03-19 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17828506#comment-17828506
 ] 

Jun Rao commented on KAFKA-16385:
-

[~showuon] : Yes, the retention by time is supposed to cover the active segment 
too. As you observed, the current implementation is a bit weird since it 
depends on whether there are new records or not. One potential way to improve 
this is to use the timestamp index to find the cutoff offset in the active 
segment and move the logStartOffset to that point. We need to understand if 
there is any additional I/O impact because of this.

> Segment is rolled before segment.ms or segment.bytes breached
> -
>
> Key: KAFKA-16385
> URL: https://issues.apache.org/jira/browse/KAFKA-16385
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.1, 3.7.0
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> Steps to reproduce:
> 0. Startup a broker with `log.retention.check.interval.ms=1000` to speed up 
> the test.
> 1. Creating a topic with the config: segment.ms=7days , segment.bytes=1GB, 
> retention.ms=1sec .
> 2. Send a record "aaa" to the topic
> 3. Wait for 1 second
> Will this segment will rolled? I thought no.
> But what I have tested is it will roll:
> {code:java}
> [2024-03-19 15:23:13,924] INFO [LocalLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Rolled new log segment at offset 1 in 3 ms. 
> (kafka.log.LocalLog)
> [2024-03-19 15:23:13,925] INFO [ProducerStateManager partition=t2-1] Wrote 
> producer snapshot at offset 1 with 1 producer ids in 1 ms. 
> (org.apache.kafka.storage.internals.log.ProducerStateManager)
> [2024-03-19 15:23:13,925] INFO [UnifiedLog partition=t2-1, 
> dir=/tmp/kafka-logs_jbod] Deleting segment LogSegment(baseOffset=0, size=71, 
> lastModifiedTime=1710832993131, largestRecordTimestamp=1710832992125) due to 
> log retention time 1000ms breach based on the largest record timestamp in the 
> segment (kafka.log.UnifiedLog)
> {code}
> The segment is rolled due to log retention time 1000ms breached, which is 
> unexpected.
> Tested in v3.5.1, it has the same issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16283) RoundRobinPartitioner will only send to half of the partitions in a topic

2024-02-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819322#comment-17819322
 ] 

Jun Rao commented on KAFKA-16283:
-

[~showuon] : Is this the same as 
https://issues.apache.org/jira/browse/KAFKA-9965?  There is a pending PR 
[https://github.com/apache/kafka/pull/12462] for addressing the issue.

> RoundRobinPartitioner will only send to half of the partitions in a topic
> -
>
> Key: KAFKA-16283
> URL: https://issues.apache.org/jira/browse/KAFKA-16283
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.6.1
>Reporter: Luke Chen
>Priority: Major
>
> When using `org.apache.kafka.clients.producer.RoundRobinPartitioner`, we 
> expect data are sent to all partitions in round-robin manner. But we found 
> there are only half of the partitions got the data. This causes half of the 
> resources(storage, consumer...) are wasted.
> {code:java}
> > bin/kafka-topics.sh --create --topic quickstart-events4 --bootstrap-server 
> > localhost:9092 --partitions 2 
> Created topic quickstart-events4.
> # send 1000 records to the topic, expecting 500 records in partition0, and 
> 500 records in partition1
> > bin/kafka-producer-perf-test.sh --topic quickstart-events4 --num-records 
> > 1000 --record-size 1024 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 
> > partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
> 1000 records sent, 6535.947712 records/sec (6.38 MB/sec), 2.88 ms avg 
> latency, 121.00 ms max latency, 2 ms 50th, 7 ms 95th, 10 ms 99th, 121 ms 
> 99.9th.
> > ls -al /tmp/kafka-logs/quickstart-events4-1
> total 24
> drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
> drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
> -rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
> -rw-r--r--   1 lukchen  wheel   1037819  2 20 19:53 .log
> -rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
> .timeindex
> -rw-r--r--   1 lukchen  wheel 8  2 20 19:53 leader-epoch-checkpoint
> -rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
> # No records in partition 1
> > ls -al /tmp/kafka-logs/quickstart-events4-0
> total 8
> drwxr-xr-x   7 lukchen  wheel   224  2 20 19:53 .
> drwxr-xr-x  70 lukchen  wheel  2240  2 20 19:53 ..
> -rw-r--r--   1 lukchen  wheel  10485760  2 20 19:53 .index
> -rw-r--r--   1 lukchen  wheel 0  2 20 19:53 .log
> -rw-r--r--   1 lukchen  wheel  10485756  2 20 19:53 
> .timeindex
> -rw-r--r--   1 lukchen  wheel 0  2 20 19:53 leader-epoch-checkpoint
> -rw-r--r--   1 lukchen  wheel43  2 20 19:53 partition.metadata
> {code}
> Tested in kafka 3.0.0, 3.2.3, and the latest trunk, they all have the same 
> issue. It should already exist for a long time.
>  
> Had a quick look, it's because we will abortOnNewBatch each time when new 
> batch created.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16186) Implement broker metrics for client telemetry usage

2024-01-30 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-16186.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

merged the PR to trunk

> Implement broker metrics for client telemetry usage
> ---
>
> Key: KAFKA-16186
> URL: https://issues.apache.org/jira/browse/KAFKA-16186
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>
> The KIP-714 lists new metrics for broker which records the usage of client 
> telemetry instances and plugin. Implement broker metrics as defined in the 
> KIP-714.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15813) Improve implementation of client instance cache

2024-01-23 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15813.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

Merged to the PR to trunk.

> Improve implementation of client instance cache
> ---
>
> Key: KAFKA-15813
> URL: https://issues.apache.org/jira/browse/KAFKA-15813
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>
> In the current implementation the ClientMetricsManager uses LRU cache but we 
> should alos support expiring stale clients i.e. client which haven't reported 
> metrics for a while.
>  
> The KIP mentions: This client instance specific state is maintained in broker 
> memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to 
> enforce the push interval rate-limiting. There is no persistence of client 
> instance metrics state across broker restarts or between brokers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16137) ListClientMetricsResourcesResponse definition is missing field descriptions

2024-01-22 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-16137.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

merged the PR to trunk.

> ListClientMetricsResourcesResponse definition is missing field descriptions
> ---
>
> Key: KAFKA-16137
> URL: https://issues.apache.org/jira/browse/KAFKA-16137
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Trivial
> Fix For: 3.8.0
>
>
> This is purely improving the readability of the Kafka protocol documentation 
> by adding missing description information for the fields of the 
> `ListClientMetricsResources` response.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15811) implement capturing client port information from Socket Server

2024-01-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15811.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

merged the PR to trunk.

> implement capturing client port information from Socket Server
> --
>
> Key: KAFKA-15811
> URL: https://issues.apache.org/jira/browse/KAFKA-15811
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15863) Handle push telemetry throttling with quota manager

2024-01-16 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807374#comment-17807374
 ] 

Jun Rao commented on KAFKA-15863:
-

[~apoorvmittal10] : If we only hit the push interval based throttling, 
currently we don't mute the channel. This doesn't completely prevent a rogue 
client since it may not implement the handling of THROTTLING_QUOTA_EXCEEDED 
properly. My comment in the PR was that other KIPs (e.g. 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-599]) that introduced 
specific throttling have added the logic to mute the channel. It seems that we 
should be consistent here.

> Handle push telemetry throttling with quota manager
> ---
>
> Key: KAFKA-15863
> URL: https://issues.apache.org/jira/browse/KAFKA-15863
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> Details: https://github.com/apache/kafka/pull/14699#discussion_r1399714279



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15950) Serialize broker heartbeat requests

2023-12-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-15950:

Description: 
This is a followup issue from the discussion in 
[https://github.com/apache/kafka/pull/14836#discussion_r1409739363].

{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.

  was:
This is a follow up issue from the discussion in 
https://github.com/apache/kafka/pull/14836#discussion_r1409739363.

{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.


> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
>
> This is a followup issue from the discussion in 
> [https://github.com/apache/kafka/pull/14836#discussion_r1409739363].
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15950) Serialize broker heartbeat requests

2023-12-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-15950:

Description: 
This is a follow up issue from the discussion in 
https://github.com/apache/kafka/pull/14836#discussion_r1409739363.

{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.

  was:
{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.


> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
>
> This is a follow up issue from the discussion in 
> https://github.com/apache/kafka/pull/14836#discussion_r1409739363.
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15950) Serialize broker heartbeat requests

2023-12-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-15950:

Description: 
{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.

  was:
{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.

{{}}
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
{{}}

The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.


> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
>
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15950) Serialize broker heartbeat requests

2023-12-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-15950:

Description: 
{{KafkaEventQueue}} does de-duping and only allows one outstanding 
{{CommunicationEvent}} in the queue. But it seems that duplicated 
{{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
calls {{sendBrokerHeartbeat}} that calls the following.

{{}}
{code:java}
_channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
handler){code}
{{}}

The problem is that we have another queue in 
{{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
{{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
{{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
{{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When it's 
processed, another {{HeartbeatRequest}} will be queued in 
{{{}NodeToControllerChannelManagerImpl{}}}.

This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
practice since {{CommunicationEvent}} is typically queued in 
{{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
{{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
{{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
reason about tests.

  was:
Currently, CommunicationEvent is scheduled with DeadlineFunction, which ignores 
the schedule time for an existing event. This wasn't an issue when 
CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
CommunicationEvent could be scheduled immediately for offline dirs. If a 
periodic CommunicationEvent is scheduled after the immediate CommunicationEvent 
in KafkaEventQueue, the former will cancel the latter, but leaves the schedule 
time to be periodic. This will unnecessarily delay the communication of the 
failed dir to the controller. 
 
Using EarliestDeadlineFunction will fix this issue.


> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
>
> {{KafkaEventQueue}} does de-duping and only allows one outstanding 
> {{CommunicationEvent}} in the queue. But it seems that duplicated 
> {{{}HeartbeatRequest{}}}s could still be generated. {{CommunicationEvent}} 
> calls {{sendBrokerHeartbeat}} that calls the following.
> {{}}
> {code:java}
> _channelManager.sendRequest(new BrokerHeartbeatRequest.Builder(data), 
> handler){code}
> {{}}
> The problem is that we have another queue in 
> {{NodeToControllerChannelManagerImpl}} that doesn't do the de-duping. Once a 
> {{CommunicationEvent}} is dequeued from {{{}KafkaEventQueue{}}}, a 
> {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}. At this point, another 
> {{CommunicationEvent}} could be enqueued in {{{}KafkaEventQueue{}}}. When 
> it's processed, another {{HeartbeatRequest}} will be queued in 
> {{{}NodeToControllerChannelManagerImpl{}}}.
> This probably won't introduce long lasting duplicated {{HeartbeatRequest}} in 
> practice since {{CommunicationEvent}} is typically queued in 
> {{KafkaEventQueue}} for heartbeat interval. By that time, other pending 
> {{{}HeartbeatRequest{}}}s will be processed and de-duped when enqueuing to 
> {{{}KafkaEventQueue{}}}. However, duplicated requests could make it hard to 
> reason about tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15950) Serialize broker heartbeat requests

2023-12-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-15950:

Summary: Serialize broker heartbeat requests  (was: CommunicationEvent 
should be scheduled with EarliestDeadlineFunction)

> Serialize broker heartbeat requests
> ---
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Assignee: Igor Soarez
>Priority: Major
>
> Currently, CommunicationEvent is scheduled with DeadlineFunction, which 
> ignores the schedule time for an existing event. This wasn't an issue when 
> CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
> CommunicationEvent could be scheduled immediately for offline dirs. If a 
> periodic CommunicationEvent is scheduled after the immediate 
> CommunicationEvent in KafkaEventQueue, the former will cancel the latter, but 
> leaves the schedule time to be periodic. This will unnecessarily delay the 
> communication of the failed dir to the controller. 
>  
> Using EarliestDeadlineFunction will fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15684) Add support to describe all subscriptions through utility

2023-12-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15684.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk.

> Add support to describe all subscriptions through utility
> -
>
> Key: KAFKA-15684
> URL: https://issues.apache.org/jira/browse/KAFKA-15684
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Open PR to support client-metrics through kafka-configs.sh doesn't list all 
> subscriptions. The functionality is missing because of missing support to 
> list client subscription in config repository and admin client. This task 
> should provide a workaround to fetch all subscriptions from config repository 
> by adding a method in KRaftMetadataCache. Later a KIP might be needed to add 
> support in AdminClient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15871) Implement kafka-client-metrics.sh tool

2023-12-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15871.
-
Resolution: Fixed

merged the PR to trunk

> Implement kafka-client-metrics.sh tool
> --
>
> Key: KAFKA-15871
> URL: https://issues.apache.org/jira/browse/KAFKA-15871
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.7.0
>
>
> Implement the `kafka-client-metrics.sh` tool which is introduced in KIP-714 
> and enhanced in KIP-1000.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15831) List Client Metrics Configuration Resources

2023-12-05 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15831.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk.

> List Client Metrics Configuration Resources
> ---
>
> Key: KAFKA-15831
> URL: https://issues.apache.org/jira/browse/KAFKA-15831
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> This JIRA tracks the development of KIP-1000 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1000%3A+List+Client+Metrics+Configuration+Resources).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15965) Test failure: org.apache.kafka.common.requests.BrokerRegistrationRequestTest

2023-12-04 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15965.
-
Fix Version/s: 3.7.0
 Assignee: Colin McCabe
   Resolution: Fixed

This is fixed by https://github.com/apache/kafka/pull/14887.

> Test failure: org.apache.kafka.common.requests.BrokerRegistrationRequestTest
> 
>
> Key: KAFKA-15965
> URL: https://issues.apache.org/jira/browse/KAFKA-15965
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apoorv Mittal
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.7.0
>
>
> 2 tests for versions 0 and 1 fails consistently.
> Build: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14767/15/tests/
>  
> {code:java}
> org.opentest4j.AssertionFailedError: 
> BrokerRegistrationRequestData(brokerId=0, clusterId='test', 
> incarnationId=Xil73H5bSZ2vYTWUVlf07Q, listeners=[], features=[], rack='a', 
> isMigratingZkBroker=false, logDirs=[], previousBrokerEpoch=1) ==> expected: 
> <-1> but was: <1>Stacktraceorg.opentest4j.AssertionFailedError: 
> BrokerRegistrationRequestData(brokerId=0, clusterId='test', 
> incarnationId=Xil73H5bSZ2vYTWUVlf07Q, listeners=[], features=[], rack='a', 
> isMigratingZkBroker=false, logDirs=[], previousBrokerEpoch=1) ==> expected: 
> <-1> but was: <1>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at 
> app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)  
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:660)
>   at 
> app//org.apache.kafka.common.requests.BrokerRegistrationRequestTest.testBasicBuild(BrokerRegistrationRequestTest.java:57)
> at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction

2023-11-29 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791367#comment-17791367
 ] 

Jun Rao commented on KAFKA-15950:
-

cc [~soarez] 

> CommunicationEvent should be scheduled with EarliestDeadlineFunction
> 
>
> Key: KAFKA-15950
> URL: https://issues.apache.org/jira/browse/KAFKA-15950
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, CommunicationEvent is scheduled with DeadlineFunction, which 
> ignores the schedule time for an existing event. This wasn't an issue when 
> CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
> CommunicationEvent could be scheduled immediately for offline dirs. If a 
> periodic CommunicationEvent is scheduled after the immediate 
> CommunicationEvent in KafkaEventQueue, the former will cancel the latter, but 
> leaves the schedule time to be periodic. This will unnecessarily delay the 
> communication of the failed dir to the controller. 
>  
> Using EarliestDeadlineFunction will fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15950) CommunicationEvent should be scheduled with EarliestDeadlineFunction

2023-11-29 Thread Jun Rao (Jira)
Jun Rao created KAFKA-15950:
---

 Summary: CommunicationEvent should be scheduled with 
EarliestDeadlineFunction
 Key: KAFKA-15950
 URL: https://issues.apache.org/jira/browse/KAFKA-15950
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.7.0
Reporter: Jun Rao


Currently, CommunicationEvent is scheduled with DeadlineFunction, which ignores 
the schedule time for an existing event. This wasn't an issue when 
CommunicationEvent is always periodic. However, with KAFKA-15360,  a 
CommunicationEvent could be scheduled immediately for offline dirs. If a 
periodic CommunicationEvent is scheduled after the immediate CommunicationEvent 
in KafkaEventQueue, the former will cancel the latter, but leaves the schedule 
time to be periodic. This will unnecessarily delay the communication of the 
failed dir to the controller. 
 
Using EarliestDeadlineFunction will fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15046) Produce performance issue under high disk load

2023-11-29 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15046.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Fix For: 3.7.0
>
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png, image-2023-08-18-19-23-36-597.png, 
> image-2023-08-18-19-29-56-377.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:38,887] INFO [ProducerStateManager partition=zzz-9] Wrote 
> producer snapshot at offset 226222355404 with 0 producer ids in 576 

[jira] [Commented] (KAFKA-15674) Consider making RequestLocal thread safe

2023-10-23 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778835#comment-17778835
 ] 

Jun Rao commented on KAFKA-15674:
-

For context, [https://github.com/apache/kafka/pull/9220] was the original 
thread-safe alternative implementation to 
[https://github.com/apache/kafka/pull/9229].

> Consider making RequestLocal thread safe
> 
>
> Key: KAFKA-15674
> URL: https://issues.apache.org/jira/browse/KAFKA-15674
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Major
>
> KAFKA-15653 found an issue with using the a request local on multiple 
> threads. The RequestLocal object was originally designed in a non-thread-safe 
> manner for performance.
> It is passed around to methods that write to the log, and KAFKA-15653 showed 
> that is it not too hard to accidentally share between different threads.
> Given all this, and new changes and dependencies in the project compared to 
> when it was first introduced, we may want to reconsider the thread safety of 
> ThreadLocal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15401) Segment with corrupted index should not be uploaded to remote storage

2023-10-23 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778807#comment-17778807
 ] 

Jun Rao commented on KAFKA-15401:
-

[~nickstery] : Thanks for reporting this issue. I am wondering how a disk full 
event will lead to corrupted indexes. When a disk is full, the flushing of the 
index will fail with an IOException. This will prevent the recovery point from 
advancing. When the broker is restarted, it seems that we should rebuild the 
index by recovering from the recovery point.

> Segment with corrupted index should not be uploaded to remote storage
> -
>
> Key: KAFKA-15401
> URL: https://issues.apache.org/jira/browse/KAFKA-15401
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Viktor Nikitash
>Assignee: Viktor Nikitash
>Priority: Minor
>  Labels: KIP-405
> Fix For: 3.7.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> During data disk full event, there could be a situation with index 
> corruption. There are existing functions which perform sanity checks to 
> TimeIndex [1], TxnIndex [2], OffsetIndex [2]. The idea is performing same 
> checks before in RemoteLogManager before we upload segment to remote storage 
> [4].
> Resources:
> [1][TimeIndex::sanityCheck()|https://github.com/apache/kafka/blob/88d2c4460a1c8c8cf5dbcc9edb43f42fe898ca00/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java#L73]
> [2][TransationIndex::sanityCheck()|https://github.com/apache/kafka/blob/88d2c4460a1c8c8cf5dbcc9edb43f42fe898ca00/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java#L187]
> [3][OffsetIndex::sanityCheck()|#L78]]
> [4][RemoteLogManager::copyLogSegmentsToRemote()|https://github.com/apache/kafka/blob/88d2c4460a1c8c8cf5dbcc9edb43f42fe898ca00/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L649]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15651) Investigate auto commit guarantees during Consumer.assign()

2023-10-20 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15651.
-
Resolution: Not A Problem

> Investigate auto commit guarantees during Consumer.assign()
> ---
>
> Key: KAFKA-15651
> URL: https://issues.apache.org/jira/browse/KAFKA-15651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> In the {{assign()}} method implementation, both {{KafkaConsumer}} and 
> {{PrototypeAsyncConsumer}} commit offsets asynchronously. Is this 
> intentional? [~junrao] asks in a [recent PR 
> review|https://github.com/apache/kafka/pull/14406/files/193af8230d0c61853d764cbbe29bca2fc6361af9#r1349023459]:
> {quote}Do we guarantee that the new owner of the unsubscribed partitions 
> could pick up the latest committed offset?
> {quote}
> Let's confirm whether the asynchronous approach is acceptable and correct. If 
> it is, great, let's enhance the documentation to briefly explain why. If it 
> is not, let's correct the behavior if it's within the API semantic 
> expectations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15582) Clean shutdown detection, broker side

2023-10-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15582.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Clean shutdown detection, broker side
> -
>
> Key: KAFKA-15582
> URL: https://issues.apache.org/jira/browse/KAFKA-15582
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
> Fix For: 3.7.0
>
>
> The clean shutdown file can now include the broker epoch before shutdown. 
> During the broker start process, the broker should extract the broker epochs 
> from the clean shutdown files. If successful, send the broker epoch through 
> the broker registration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14960) Metadata Request Manager and listTopics/partitionsFor API

2023-09-21 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14960.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk.

> Metadata Request Manager and listTopics/partitionsFor API
> -
>
> Key: KAFKA-14960
> URL: https://issues.apache.org/jira/browse/KAFKA-14960
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Implement listTopics and partitionsFor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions

2023-09-18 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15306.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Integrate committed offsets logic when updating fetching positions
> --
>
> Key: KAFKA-15306
> URL: https://issues.apache.org/jira/browse/KAFKA-15306
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Integrate refreshCommittedOffsets logic, currently performed by the 
> coordinator, into the update fetch positions performed on every iteration of 
> the async consumer poll loop. This should rely on the CommitRequestManager to 
> perform the request based on the refactored model, but it should reuse the 
> logic for processing the committed offsets and updating the positions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-09-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15163.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

This is covered in https://github.com/apache/kafka/pull/14346.

> Implement validatePositions functionality for new KafkaConsumer
> ---
>
> Key: KAFKA-15163
> URL: https://issues.apache.org/jira/browse/KAFKA-15163
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for validating positions in the new OffsetsRequestManager. 
> This task will include a new event for the validatePositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The validate positions implementation will keep the same behaviour as the one 
> in the old consumer, but adapted to the new threading model. So it is based 
> in a VALIDATE_POSITIONS events that is submitted to the background thread, 
> and the processed by the ApplicationEventProcessor. The processing itself is 
> done by the OffsetRequestManager given that this will require an 
> OFFSET_FOR_LEADER_EPOCH request. This task will introduce support for such 
> requests in the OffsetRequestManager, responsible for offset-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15115) Implement resetPositions functionality in OffsetsRequestManager

2023-09-13 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15115.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Implement resetPositions functionality in OffsetsRequestManager
> ---
>
> Key: KAFKA-15115
> URL: https://issues.apache.org/jira/browse/KAFKA-15115
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Introduce support for resetting positions in the new OffsetsRequestManager. 
> This task will include a new event for the resetPositions calls performed 
> from the new consumer, and the logic for handling such events in the 
> OffsetRequestManager.
> The reset positions implementation will keep the same behaviour as the one in 
> the old consumer, but adapted to the new threading model. So it is based in a 
> RESET_POSITIONS events that is submitted to the background thread, and then 
> processed by the ApplicationEventProcessor. The processing itself is done by 
> the OffsetRequestManager given that this will require a LIST_OFFSETS request 
> for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2023-09-05 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-9800.

Fix Version/s: 3.7.0
   Resolution: Fixed

Merged [https://github.com/apache/kafka/pull/14111] to trunk.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: KIP-580, client
> Fix For: 3.7.0
>
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible with the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15081) Implement new consumer offsetsForTimes

2023-09-01 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15081.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Implement new consumer offsetsForTimes
> --
>
> Key: KAFKA-15081
> URL: https://issues.apache.org/jira/browse/KAFKA-15081
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Implement offsetForTimes for the kafka consumer based on the new threading 
> model, using the OffsetsRequestManager. No changes at the consumer API level.
> A call to the offsetsForTime consumer functionality should generate a 
> ListOffsetsApplicationEvent with the timestamps to search provided in 
> parameters. The ListOffsetsApplicationEvent is then handled by the 
> OffsetsRequestManager, that builds the ListOffsets requests and processes the 
> response when received.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14965) Introduce OffsetsRequestManager to integrate ListOffsets requests into new consumer threading refactor

2023-09-01 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14965.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Introduce OffsetsRequestManager to integrate ListOffsets requests into new 
> consumer threading refactor
> --
>
> Key: KAFKA-14965
> URL: https://issues.apache.org/jira/browse/KAFKA-14965
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> This task introduces new functionality for handling ListOffsets requests for 
> the new consumer implementation, as part for the ongoing work for the 
> consumer threading model refactor.
> This task introduces a new class named \{{OffsetsRequestManager, 
> }}responsible of :
>  * building ListOffsets requests
>  * process its responses
> Consumer API functionality that requires ListOffsets requests are implemented 
> using this manager: beginningOffsets, endOffsets and offsetsForTimes.
> These consumer API functions will generate a ListOffsetsApplicationEvent with 
> parameters. This event is then handled by the OffsetsRequestManager, who will 
> build the ListOffsets request and process its responses, to provide a result 
> back to the API via the ListOffsetsApplicationEvent completion.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14875) Implement Wakeup()

2023-08-29 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14875.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

merged the PR to trunk

> Implement Wakeup()
> --
>
> Key: KAFKA-14875
> URL: https://issues.apache.org/jira/browse/KAFKA-14875
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0
>
>
> Implement wakeup() and WakeupException.  This would be different to the 
> current implementation because I think we just need to interrupt the blocking 
> futures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14950) Implement assign() and assignment()

2023-07-21 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14950.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

merged the PR to trunk.

> Implement assign() and assignment()
> ---
>
> Key: KAFKA-14950
> URL: https://issues.apache.org/jira/browse/KAFKA-14950
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-945
> Fix For: 3.6.0
>
>
> Implement assign() and assignment()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-11 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731375#comment-17731375
 ] 

Jun Rao commented on KAFKA-15046:
-

The potential issue with moving the LeaderEpochFile flushing to an arbitrary 
background thread is that it may not be synchronized with the log flushing and 
could lead to a situation where the flushed LeaderEpochFile doesn't reflect all 
the log data before the recovery point.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager partition=yyy-34] Wrote 
> producer snapshot at offset 247996937813 with 0 producer ids in 547 ms. 
> (kafka.log.ProducerStateManager)
> 

[jira] [Commented] (KAFKA-15046) Produce performance issue under high disk load

2023-06-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17731119#comment-17731119
 ] 

Jun Rao commented on KAFKA-15046:
-

[~ocadaruma] : Thanks for identifying the problem. Another way to improve this 
is to move the LeaderEpochFile flushing logic to be part of the flushing of 
rolled segments. Currently, we flush each rolled segment in the background 
thread already. We could just make sure that we also flush the LeaderEpochFile 
there before advancing the recovery point. This way, we don't need to introduce 
a separate background task.

> Produce performance issue under high disk load
> --
>
> Key: KAFKA-15046
> URL: https://issues.apache.org/jira/browse/KAFKA-15046
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 3.3.2
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>  Labels: performance
> Attachments: image-2023-06-01-12-46-30-058.png, 
> image-2023-06-01-12-52-40-959.png, image-2023-06-01-12-54-04-211.png, 
> image-2023-06-01-12-56-19-108.png
>
>
> * Phenomenon:
>  ** !image-2023-06-01-12-46-30-058.png|width=259,height=236!
>  ** Producer response time 99%ile got quite bad when we performed replica 
> reassignment on the cluster
>  *** RequestQueue scope was significant
>  ** Also request-time throttling happened at the incidental time. This caused 
> producers to delay sending messages in the mean time.
>  ** The disk I/O latency was higher than usual due to the high load for 
> replica reassignment.
>  *** !image-2023-06-01-12-56-19-108.png|width=255,height=128!
>  * Analysis:
>  ** The request-handler utilization was much higher than usual.
>  *** !image-2023-06-01-12-52-40-959.png|width=278,height=113!
>  ** Also, thread time utilization was much higher than usual on almost all 
> users
>  *** !image-2023-06-01-12-54-04-211.png|width=276,height=110!
>  ** From taking jstack several times, for most of them, we found that a 
> request-handler was doing fsync for flusing ProducerState and meanwhile other 
> request-handlers were waiting Log#lock for appending messages.
>  * 
>  ** 
>  *** 
> {code:java}
> "data-plane-kafka-request-handler-14" #166 daemon prio=5 os_prio=0 
> cpu=51264789.27ms elapsed=599242.76s tid=0x7efdaeba7770 nid=0x1e704 
> runnable  [0x7ef9a12e2000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.force0(java.base@11.0.17/Native 
> Method)
> at 
> sun.nio.ch.FileDispatcherImpl.force(java.base@11.0.17/FileDispatcherImpl.java:82)
> at 
> sun.nio.ch.FileChannelImpl.force(java.base@11.0.17/FileChannelImpl.java:461)
> at 
> kafka.log.ProducerStateManager$.kafka$log$ProducerStateManager$$writeSnapshot(ProducerStateManager.scala:451)
> at 
> kafka.log.ProducerStateManager.takeSnapshot(ProducerStateManager.scala:754)
> at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1544)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.append(UnifiedLog.scala:919)
> - locked <0x00060d75d820> (a java.lang.Object)
> at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:760)
> at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1170)
> at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1158)
> at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:956)
> at 
> kafka.server.ReplicaManager$$Lambda$2379/0x000800b7c040.apply(Unknown 
> Source)
> at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
> at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
> at scala.collection.mutable.HashMap.map(HashMap.scala:35)
> at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:944)
> at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:666)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:175)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
> at java.lang.Thread.run(java.base@11.0.17/Thread.java:829) {code}
>  * 
>  ** Also there were bunch of logs that writing producer snapshots took 
> hundreds of milliseconds.
>  *** 
> {code:java}
> ...
> [2023-05-01 11:08:36,689] INFO [ProducerStateManager partition=xxx-4] Wrote 
> producer snapshot at offset 1748817854 with 8 producer ids in 809 ms. 
> (kafka.log.ProducerStateManager)
> [2023-05-01 11:08:37,319] INFO [ProducerStateManager 

[jira] [Resolved] (KAFKA-14966) Extract reusable common logic from OffsetFetcher

2023-06-08 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14966.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

Merged the PR to trunk.

> Extract reusable common logic from OffsetFetcher
> 
>
> Key: KAFKA-14966
> URL: https://issues.apache.org/jira/browse/KAFKA-14966
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
> Fix For: 3.6.0
>
>
> The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, 
> validate and reset positions. 
> For the new consumer based on a refactored threading model, similar 
> functionality will be needed by the ListOffsetsRequestManager component. 
> This task aims at identifying and extracting the OffsetFetcher functionality 
> that can be reused by the new consumer implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15058) Improve the accuracy of Histogram in client metric

2023-06-05 Thread Jun Rao (Jira)
Jun Rao created KAFKA-15058:
---

 Summary: Improve the accuracy of Histogram in client metric
 Key: KAFKA-15058
 URL: https://issues.apache.org/jira/browse/KAFKA-15058
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Jun Rao


The Histogram type (org.apache.kafka.common.metrics.stats) in KafkaMetrics in 
the client module statically divides the value space into a fixed number of 
buckets and only returns values on the bucket boundary. So, the returned 
histogram value may never show up in a recorded value. Yammer Histogram, on the 
other hand, uses reservoir sampling. The reported value is always one of the 
recorded values, and is likely more accurate. Because of this, the Histogram 
type in client metric hasn't been used widely. It would be useful to improve 
Histogram in the client metric to be more accurate. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-05-08 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17720717#comment-17720717
 ] 

Jun Rao edited comment on KAFKA-14561 at 5/9/23 12:24 AM:
--

The PR was reverted in the 3.5 branch. Updated the fix version.


was (Author: junrao):
The PR was reverted in 3.5 branch. Updated the fix version.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-05-08 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-14561:

Fix Version/s: 3.6.0
   (was: 3.5.0)

The PR was reverted in 3.5 branch. Updated the fix version.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14892) Harmonize package names in storage module

2023-04-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715193#comment-17715193
 ] 

Jun Rao commented on KAFKA-14892:
-

Sounds good to me. Including storage in the package name probably helps avoid 
the split package issue.

> Harmonize package names in storage module
> -
>
> Key: KAFKA-14892
> URL: https://issues.apache.org/jira/browse/KAFKA-14892
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Priority: Major
>
> We currently have:
>  # org.apache.kafka.server.log.remote.storage: public api in storage-api 
> module
>  # org.apache.kafka.server.log.remote: private api in storage module
>  # org.apache.kafka.storage.internals.log: private api in storage module
> A way to make this consistent could be:
>  # org.apache.kafka.storage.* or org.apache.kafka.storage.api.*: public api 
> in storage-api module
>  # org.apache.kafka.storage.internals.log.remote: private api in storage 
> module
>  # org.apache.kafka.storage.internals.log: private api in storage module 
> (stays the same)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14561.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

merged the PR to trunk.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.5.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14617) Replicas with stale broker epoch should not be allowed to join the ISR

2023-04-07 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14617.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

merged all PRs to trunk.

> Replicas with stale broker epoch should not be allowed to join the ISR
> --
>
> Key: KAFKA-14617
> URL: https://issues.apache.org/jira/browse/KAFKA-14617
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Calvin Liu
>Assignee: Calvin Liu
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14685) TierStateMachine interface for building remote aux log

2023-02-24 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14685.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

merged the PR to trunk.

> TierStateMachine interface for building remote aux log
> --
>
> Key: KAFKA-14685
> URL: https://issues.apache.org/jira/browse/KAFKA-14685
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Matthew Wong
>Assignee: Matthew Wong
>Priority: Major
> Fix For: 3.5.0
>
>
> To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can 
> introduce an interface to manage state transitions of building the remote aux 
> log asynchronously



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14685) TierStateMachine interface for building remote aux log

2023-02-24 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-14685:
---

Assignee: Matthew Wong

> TierStateMachine interface for building remote aux log
> --
>
> Key: KAFKA-14685
> URL: https://issues.apache.org/jira/browse/KAFKA-14685
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Matthew Wong
>Assignee: Matthew Wong
>Priority: Major
>
> To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can 
> introduce an interface to manage state transitions of building the remote aux 
> log asynchronously



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2023-01-05 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17655153#comment-17655153
 ] 

Jun Rao commented on KAFKA-9087:


[~chia7712] : Thanks for the explanation. Great find! I agree that this is a 
bug and the fix that you suggested makes sense. In alterReplicaLogDirs(), we 
initialize the initial offset for ReplicaAlterLogDirsThread with 
futureLog.highWatermark. We should do the same thing when handling the 
LeaderAndIsrRequest. It seems that the bug was introduced in this PR 
[https://github.com/apache/kafka/pull/6841|https://github.com/apache/kafka/pull/6841.].
 Do you plan to submit a PR?

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 segments, log start 
> offset 4120720 and log end offset 4224887 in 70 ms (kafka.log.Log)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica loaded for partition metrics_timers-35 
> with initial high watermark 0 (kafka.cluster.Replica)
> [2019-06-11 12:21:45,307] INFO Replica loaded for 

[jira] [Commented] (KAFKA-9087) ReplicaAlterLogDirs stuck and restart fails with java.lang.IllegalStateException: Offset mismatch for the future replica

2022-12-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17651012#comment-17651012
 ] 

Jun Rao commented on KAFKA-9087:


[~chia7712] : Thanks for the update. About the race condition. I am still 
wondering how we got into that state. Let's say ReplicaAlterLogDirsThread is in 
the about to append the fetched data to a future log when the log's dir is 
being changed. In this case, we will first remove the partition from the 
partitionState in ReplicaAlterLogDirsThread, recreate the future log and add 
the partition to ReplicaAlterLogDirsThread again. If ReplicaAlterLogDirsThread 
tries to append an old fetched data, it should fail the following test since 
the fetch offset in the fetch request and the currentFetchState will be 
different.

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L347]

 

So, ReplicaAlterLogDirsThread is supposed to ignore the old fetched data and 
fetch again using the new fetch offset. I am wondering why that didn't happen.

> ReplicaAlterLogDirs stuck and restart fails with 
> java.lang.IllegalStateException: Offset mismatch for the future replica
> 
>
> Key: KAFKA-9087
> URL: https://issues.apache.org/jira/browse/KAFKA-9087
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Gregory Koshelev
>Priority: Major
>
> I've started multiple replica movements between log directories and some 
> partitions were stuck. After the restart of the broker I've got exception in 
> server.log:
> {noformat}
> [2019-06-11 17:58:46,304] ERROR [ReplicaAlterLogDirsThread-1]: Error due to 
> (kafka.server.ReplicaAlterLogDirsThread)
>  org.apache.kafka.common.KafkaException: Error processing data for partition 
> metrics_timers-35 offset 4224887
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:342)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:300)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:299)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:299)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:299)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>  at scala.Option.foreach(Option.scala:274)
>  at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Caused by: java.lang.IllegalStateException: Offset mismatch for the future 
> replica metrics_timers-35: fetched offset = 4224887, log end offset = 0.
>  at 
> kafka.server.ReplicaAlterLogDirsThread.processPartitionData(ReplicaAlterLogDirsThread.scala:107)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:311)
>  ... 16 more
>  [2019-06-11 17:58:46,305] INFO [ReplicaAlterLogDirsThread-1]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
> {noformat}
> Also, ReplicaAlterLogDirsThread has been stopped. Further restarts do not fix 
> the problem. To fix it I've stopped the broker and remove all the stuck 
> future partitions.
> Detailed log below
> {noformat}
> [2019-06-11 12:09:52,833] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Truncating to 4224887 has no effect as the largest 
> offset in the log is 4224886 (kafka.log.Log)
> [2019-06-11 12:21:34,979] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Loading producer state till offset 4224887 with 
> message format version 2 (kafka.log.Log)
> [2019-06-11 12:21:34,980] INFO [ProducerStateManager 
> partition=metrics_timers-35] Loading producer state from snapshot file 
> '/storage2/kafka/data/metrics_timers-35/04224887.snapshot' 
> (kafka.log.ProducerStateManager)
> [2019-06-11 12:21:34,980] INFO [Log partition=metrics_timers-35, 
> dir=/storage2/kafka/data] Completed load of log with 1 

[jira] [Assigned] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2022-12-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-10432:
---

Assignee: Lucas Bradstreet

> LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0
> -
>
> Key: KAFKA-10432
> URL: https://issues.apache.org/jira/browse/KAFKA-10432
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Lucas Bradstreet
>Assignee: Lucas Bradstreet
>Priority: Major
> Fix For: 2.6.1
>
>
> I added some functionality to the system tests to compare epoch cache 
> lineages ([https://github.com/apache/kafka/pull/9213]), and I found a bug in 
> leader epoch cache recovery.
> The test hard kills a broker and the cache hasn't been flushed yet, and then 
> it starts up and goes through log recovery. After recovery there is 
> divergence in the epoch caches for epoch 0:
> {noformat}
> AssertionError: leader epochs for output-topic-1 didn't match
>  [{0: 9393L, 2: 9441L, 4: 42656L},
>  {0: 0L, 2: 9441L, 4: 42656L}, 
>  {0: 0L, 2: 9441L, 4: 42656L}]
>   
>   
> {noformat}
> The cache is supposed to include the offset for epoch 0 but in recovery it 
> skips it 
> [https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
>  due to 
> [https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
>  Then it stamps the epoch with a later offset when fetching from the leader.
> I'm not sure why the recovery code includes the condition 
> `batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and 
> he believes it may have been intended to avoid assigning negative epochs but 
> is not sure why it was added. None of the tests fail with this check removed.
> {noformat}
>   leaderEpochCache.foreach { cache =>
> if (batch.partitionLeaderEpoch > 0 && 
> cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
>   cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
>   }
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12736) KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completes

2022-12-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12736.
-
Fix Version/s: 3.0.0
 Assignee: Lucas Bradstreet
   Resolution: Fixed

> KafkaProducer.flush holds onto completed ProducerBatch(s) until flush 
> completes
> ---
>
> Key: KAFKA-12736
> URL: https://issues.apache.org/jira/browse/KAFKA-12736
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Assignee: Lucas Bradstreet
>Priority: Minor
> Fix For: 3.0.0
>
>
> When flush is called a copy of the incomplete batches is made. This means 
> that the full ProducerBatch(s) are held in memory until the flush has 
> completed. For batches where the existing memory pool is used this is not as 
> wasteful as the memory will already be returned to the pool, but for non pool 
> memory it can only be GC'd after the flush has completed. Rather than use 
> copyAll we can make a new array with only the produceFuture(s) and await on 
> those.
>  
> {code:java}
> /**
>  * Mark all partitions as ready to send and block until the send is complete
>  */
> public void awaitFlushCompletion() throws InterruptedException {
>  try {
>  for (ProducerBatch batch : this.incomplete.copyAll())
>  batch.produceFuture.await();
>  } finally {
>  this.flushesInProgress.decrementAndGet();
>  }
> }
> {code}
> This may help in cases where the application is already memory constrained 
> and memory usage is slowing progress on completion of the incomplete batches.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13369) Follower fetch protocol enhancements for tiered storage.

2022-12-17 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13369.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

merged the PR to trunk.

> Follower fetch protocol enhancements for tiered storage.
> 
>
> Key: KAFKA-13369
> URL: https://issues.apache.org/jira/browse/KAFKA-13369
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14303) Producer.send without record key and batch.size=0 goes into infinite loop

2022-11-28 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14303.
-
Fix Version/s: 3.4.0
   3.3.2
   Resolution: Fixed

Merged the PR to 3.3 and trunk.

> Producer.send without record key and batch.size=0 goes into infinite loop
> -
>
> Key: KAFKA-14303
> URL: https://issues.apache.org/jira/browse/KAFKA-14303
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.3.0, 3.3.1
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>  Labels: Partitioner, bug, client, producer, regresion
> Fix For: 3.4.0, 3.3.2
>
>
> 3.3 has broken previous producer behavior.
> A call to {{producer.send(record)}} with a record without a key and 
> configured with {{batch.size=0}} never returns.
> Reproducer:
> {code:java}
> class ProducerIssueTest extends IntegrationTestHarness {
>   override protected def brokerCount = 1
>   @Test
>   def testProduceWithBatchSizeZeroAndNoRecordKey(): Unit = {
> val topicName = "foo"
> createTopic(topicName)
> val overrides = new Properties
> overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 0)
> val producer = createProducer(keySerializer = new StringSerializer, 
> valueSerializer = new StringSerializer, overrides)
> val record = new ProducerRecord[String, String](topicName, null, "hello")
> val future = producer.send(record) // goes into infinite loop here
> future.get(10, TimeUnit.SECONDS)
>   }
> } {code}
>  
> [Documentation for producer 
> configuration|https://kafka.apache.org/documentation/#producerconfigs_batch.size]
>  states {{batch.size=0}} as a valid value:
> {quote}Valid Values: [0,...]
> {quote}
> and recommends its use directly:
> {quote}A small batch size will make batching less common and may reduce 
> throughput (a batch size of zero will disable batching entirely).
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10760) In compacted topic with max.compaction.lag.ms, the segments are not rolled until new messages arrive

2022-10-18 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17619908#comment-17619908
 ] 

Jun Rao commented on KAFKA-10760:
-

A potential solution is to implement the time based segment rolling based on 
the difference between the current time and the segment creation time. Since 
java 7, we could use 
[https://docs.oracle.com/javase/8/docs/api/java/nio/file/attribute/BasicFileAttributes.html#creationTime--]
  to get segment create time.

> In compacted topic with max.compaction.lag.ms, the segments are not rolled 
> until new messages arrive
> 
>
> Key: KAFKA-10760
> URL: https://issues.apache.org/jira/browse/KAFKA-10760
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Sarwar Bhuiyan
>Assignee:  Brajesh Kumar
>Priority: Major
>
> Currently, if a compacted topic has min.cleanable.dirty.ratio set to 
> something low and max.compaction.lag.ms set to a small time, according to KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-354] the expectation 
> is that the active segment will be rolled regardless or segment.ms or whether 
> new data has come in to "advance" the time. However, in practice, the current 
> implementation only rolls the segment when new data which means that there 
> are situations where the topic is not fully compacted until new data arrives 
> which may not be until a while later. The implementation can be improved by 
> rolling the segment just purely based on the max.compaction.lag.ms setting. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14321) max.compaction.lag.ms is not enforced accurately

2022-10-18 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14321.
-
Resolution: Duplicate

This actually duplicates KAFKA-10760. Closing this one.

> max.compaction.lag.ms is not enforced accurately
> 
>
> Key: KAFKA-14321
> URL: https://issues.apache.org/jira/browse/KAFKA-14321
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> Compaction only cleans data in non-active segments. When 
> max.compaction.lag.ms is set, we use it to set segment.ms to force segment 
> rolling by time. However, the current implementation of time-based segment 
> roll is not precise. It only rolls a segment if the new record's timestamp 
> differs from the timestamp of the first record in the segment by more than 
> segment.ms. If we have a bunch of records appended within segment.ms and then 
> stop producing new records, all those records could remain in the active 
> segments forever, which prevents the records to be cleaned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14321) max.compaction.lag.ms is not enforced accurately

2022-10-18 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17619904#comment-17619904
 ] 

Jun Rao commented on KAFKA-14321:
-

A potential solution is to implement the time based segment rolling based on 
segment creation time.

> max.compaction.lag.ms is not enforced accurately
> 
>
> Key: KAFKA-14321
> URL: https://issues.apache.org/jira/browse/KAFKA-14321
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> Compaction only cleans data in non-active segments. When 
> max.compaction.lag.ms is set, we use it to set segment.ms to force segment 
> rolling by time. However, the current implementation of time-based segment 
> roll is not precise. It only rolls a segment if the new record's timestamp 
> differs from the timestamp of the first record in the segment by more than 
> segment.ms. If we have a bunch of records appended within segment.ms and then 
> stop producing new records, all those records could remain in the active 
> segments forever, which prevents the records to be cleaned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14321) max.compaction.lag.ms is not enforced accurately

2022-10-18 Thread Jun Rao (Jira)
Jun Rao created KAFKA-14321:
---

 Summary: max.compaction.lag.ms is not enforced accurately
 Key: KAFKA-14321
 URL: https://issues.apache.org/jira/browse/KAFKA-14321
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


Compaction only cleans data in non-active segments. When max.compaction.lag.ms 
is set, we use it to set segment.ms to force segment rolling by time. However, 
the current implementation of time-based segment roll is not precise. It only 
rolls a segment if the new record's timestamp differs from the timestamp of the 
first record in the segment by more than segment.ms. If we have a bunch of 
records appended within segment.ms and then stop producing new records, all 
those records could remain in the active segments forever, which prevents the 
records to be cleaned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-4545) tombstone needs to be removed after delete.retention.ms has passed after it has been cleaned

2022-10-18 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-4545.

Fix Version/s: 3.1.0
   Resolution: Fixed

This is fixed in KAFKA-8522.

> tombstone needs to be removed after delete.retention.ms has passed after it 
> has been cleaned
> 
>
> Key: KAFKA-4545
> URL: https://issues.apache.org/jira/browse/KAFKA-4545
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jun Rao
>Assignee: Richard Yu
>Priority: Minor
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> The algorithm for removing the tombstone in a compacted is supposed to be the 
> following.
> 1. Tombstone is never removed when it's still in the dirty portion of the log.
> 2. After the tombstone is in the cleaned portion of the log, we further delay 
> the removal of the tombstone by delete.retention.ms since the time the 
> tombstone is in the cleaned portion.
> Once the tombstone is in the cleaned portion, we know there can't be any 
> message with the same key before the tombstone. Therefore, for any consumer, 
> if it reads a non-tombstone message before the tombstone, but can read to the 
> end of the log within delete.retention.ms, it's guaranteed to see the 
> tombstone.
> However, the current implementation doesn't seem correct. We delay the 
> removal of the tombstone by delete.retention.ms since the last modified time 
> of the last cleaned segment. However, the last modified time is inherited 
> from the original segment, which could be arbitrarily old. So, the tombstone 
> may not be preserved as long as it needs to be.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14156) Built-in partitioner may create suboptimal batches with large linger.ms

2022-09-14 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14156.
-
  Assignee: Artem Livshits
Resolution: Fixed

Merged the PR to 3.3 and trunk.

> Built-in partitioner may create suboptimal batches with large linger.ms
> ---
>
> Key: KAFKA-14156
> URL: https://issues.apache.org/jira/browse/KAFKA-14156
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: Artem Livshits
>Assignee: Artem Livshits
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new built-in "sticky" partitioner switches partitions based on the amount 
> of bytes produced to a partition.  It doesn't use batch creation as a switch 
> trigger.  The previous "sticky" DefaultPartitioner switched partition when a 
> new batch was created and with small linger.ms (default is 0) could result in 
> sending larger batches to slower brokers potentially overloading them.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
>  for more detail.
> However, the with large linger.ms, the new built-in partitioner may create 
> suboptimal batches.  Let's consider an example, suppose linger.ms=500, 
> batch.size=16KB (default) and we produce 24KB / sec, i.e. every 500ms we 
> produce 12KB worth of data.  The new built-in partitioner would switch 
> partition on every 16KB, so we could get into the following batching pattern:
>  * produce 12KB to one partition in 500ms, hit linger, send 12KB batch
>  * produce 4KB more to the same partition, now we've produced 16KB of data, 
> switch partition
>  * produce 12KB to the second partition in 500ms, hit linger, send 12KB batch
>  * in the mean time the 4KB produced to the first partition would hit linger 
> as well, sending 4KB batch
>  * produce 4KB more to the second partition, now we've produced 16KB of data 
> to the second partition, switch to 3rd partition
> so in this scenario the new built-in partitioner produces a mix of 12KB and 
> 4KB batches, while the previous DefaultPartitioner would produce only 12KB 
> batches -- it switches on new batch creation, so there is no "mid-linger" 
> leftover batches.
> To avoid creation of batch fragmentation on partition switch, we can wait 
> until the batch is ready before switching the partition, i.e. the condition 
> to switch to a new partition would be "produced batch.size bytes" AND "batch 
> is not lingering".  This may potentially introduce some non-uniformity into 
> data distribution, but unlike the previous DefaultPartitioner, the 
> non-uniformity would not be based on broker performance and won't 
> re-introduce the bad pattern of sending more data to slower brokers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-25 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570964#comment-17570964
 ] 

Jun Rao commented on KAFKA-13868:
-

Thanks, [~mimaison]. I see it now.

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569563#comment-17569563
 ] 

Jun Rao edited comment on KAFKA-13700 at 7/21/22 5:04 PM:
--

Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. One possibility is that there is some issue in the network. Is the 
error in ReplicaFetcherThread persistent or transient? As Divij asked earlier, 
does this error occur in other topic partitions?


was (Author: junrao):
Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. Is the error in ReplicaFetcherThread persistent or transient?

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569563#comment-17569563
 ] 

Jun Rao commented on KAFKA-13700:
-

Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. Is the error in ReplicaFetcherThread persistent or transient?

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569525#comment-17569525
 ] 

Jun Rao commented on KAFKA-13868:
-

[~mimaison] : You mentioned "As per the email sent to the PMC, all updates have 
to be done by July 22.", do you know when was the email sent to Kafka PMC 
related to this? I can't seem to find this. Thanks.

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14020) Performance regression in Producer

2022-07-20 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14020.
-
Resolution: Fixed

merged the PR to 3.3.

> Performance regression in Producer
> --
>
> Key: KAFKA-14020
> URL: https://issues.apache.org/jira/browse/KAFKA-14020
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: John Roesler
>Assignee: Artem Livshits
>Priority: Blocker
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
>  introduced a 10% performance regression in the KafkaProducer under a default 
> config.
>  
> The context for this result is a benchmark that we run for Kafka Streams. The 
> benchmark provisions 5 independent AWS clusters, including one broker node on 
> an i3.large and one client node on an i3.large. During a benchmark run, we 
> first run the Producer for 10 minutes to generate test data, and then we run 
> Kafka Streams under a number of configurations to measure its performance.
> Our observation was a 10% regression in throughput under the simplest 
> configuration, in which Streams simply consumes from a topic and does nothing 
> else. That benchmark actually runs faster than the producer that generates 
> the test data, so its thoughput is bounded by the data generator's 
> throughput. After investigation, we realized that the regression was in the 
> data generator, not the consumer or Streams.
> We have numerous benchmark runs leading up to the commit in question, and 
> they all show a throughput in the neighborhood of 115,000 records per second. 
> We also have 40 runs including and after that commit, and they all show a 
> throughput in the neighborhood of 105,000 records per second. A test on 
> [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] 
> shows a return to around 115,000 records per second.
> Config:
> {code:java}
> final Properties properties = new Properties();
> properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
> properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> {code}
> Here's the producer code in the data generator. Our tests were running with 
> three produceThreads.
> {code:java}
>  for (int t = 0; t < produceThreads; t++) {
> futures.add(executorService.submit(() -> {
> int threadTotal = 0;
> long lastPrint = start;
> final long printInterval = Duration.ofSeconds(10).toMillis();
> long now;
> try (final org.apache.kafka.clients.producer.Producer 
> producer = new KafkaProducer<>(producerConfig(broker))) {
> while (limit > (now = System.currentTimeMillis()) - start) {
> for (int i = 0; i < 1000; i++) {
> final String key = keys.next();
> final String data = dataGen.generate();
> producer.send(new ProducerRecord<>(topic, key, 
> valueBuilder.apply(key, data)));
> threadTotal++;
> }
> if ((now - lastPrint) > printInterval) {
> System.out.println(Thread.currentThread().getName() + " 
> produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
> Duration.ofMillis(now - start));
> lastPrint = now;
> }
> }
> }
> total.addAndGet(threadTotal);
> System.out.println(Thread.currentThread().getName() + " finished (" + 
> numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
> }));
> }{code}
> As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12901) Metadata not updated after broker restart.

2022-07-13 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566418#comment-17566418
 ] 

Jun Rao commented on KAFKA-12901:
-

[~suriyav] : Thanks for the reply. We recently fixed 
https://issues.apache.org/jira/browse/KAFKA-13461, where we mishandled the ZK 
auth failure event. That could stop the controller and affect the metadata 
propagation. Not sure if that's causing the exact issue here. But you may want 
to upgrade to a version including that fix.

> Metadata not updated after broker restart.
> --
>
> Key: KAFKA-12901
> URL: https://issues.apache.org/jira/browse/KAFKA-12901
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Suriya Vijayaraghavan
>Priority: Major
>
> We upgraded to version 2.8 from 2.7. After monitoring for few weeks we 
> upgraded in our production setup (as we didn't enable Kraft we went ahead), 
> we faced TimeoutException in our clients after few weeks in our production 
> setup. We tried to list all active brokers using admin client API, all 
> brokers were listed properly. So we logged into that broker and tried to do a 
> describe topic with localhost as bootstrap-server, but we got timeout as 
> there.
> When checking the logs, we noticed a Shutdown print from kafka-shutdown-hook
> thread (zookeeper session timed out and we had three retry failures). But the 
> controlled shutdown got failed (got unknown server error response from the 
> controller), and proceeded to unclean shutdown. Still the process didn't get 
> quit but the process didnt process any other operation as well.  And this did 
> not remove the broker from alive status for hours (able to see this broker in 
> list of brokers) and our clients were still trying to contact this broker and 
> failing with timeout exception. So we tried restarting the problematic 
> broker, but we faced unknown topic or partition issue in our client after the 
> restart which caused timeout as well. We noticed that metadata was not 
> loaded. So we had to restart our controller. And after restarting the 
> controller everthing got back to normal.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-07-13 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566412#comment-17566412
 ] 

Jun Rao commented on KAFKA-13953:
-

[~doguscan] : It's possible that a Kafka bug caused the corruption.

The CorruptRecordException in the description comes from 
[https://github.com/apache/kafka/blob/2.5.1/clients/src/main/java/org/apache/kafka/common/record/ByteBufferLogInputStream.java#L73].
 This indicates a 0 size of a record batch, which is different from the 
negative record size in your analysis. So, we need to verify the first byte 
where the corruption happens.

As for next steps, could you

(1) run DumpLogSegment tool on the corrupted file to double check the CRC is 
valid?

(2) attach the section of the log file including bytes from the beginning of a 
record batch to the corruption point? 

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-07-11 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565138#comment-17565138
 ] 

Jun Rao commented on KAFKA-13953:
-

[~doguscan] : The broker verifies the batch level CRC before appending the 
batch to the log. So, it's more likely that the corruption happened at the 
storage level. Byte wise, does the corrupted area have any patterns (e.g. 
consecutive 0s)? Next time if this happens again, it would be useful to compare 
the bytes across replicas to see if the corrupted bytes are identical across 
replicas.

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-07-08 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17564386#comment-17564386
 ] 

Jun Rao commented on KAFKA-13953:
-

[~doguscan] : Interesting. Do all replicas have the identical corrupted bytes? 
Would it be possible for you to attach the corrupted log segment for further 
analysis? If not, our record batch format is documented in DefaultRecordBatch. 
It would be great if you could help identify where the corrupted field is (e.g. 
at batch or record level). Thanks,

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-06-03 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546196#comment-17546196
 ] 

Jun Rao commented on KAFKA-13953:
-

[~abasilbr] : If you have multiple replicas, it's unlikely all replicas are 
corrupted. You can just delete the corrupted replica without losing data. Also, 
is this transient? The corruption could also be caused during network transfer.

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13803) Refactor Leader API Access

2022-06-03 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13803.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk

> Refactor Leader API Access
> --
>
> Key: KAFKA-13803
> URL: https://issues.apache.org/jira/browse/KAFKA-13803
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rittika Adhikari
>Assignee: Rittika Adhikari
>Priority: Major
> Fix For: 3.3.0
>
>
> Currently, AbstractFetcherThread has a series of protected APIs which control 
> access to the Leader. ReplicaFetcherThread and ReplicaAlterLogDirsThread 
> respectively override these protected APIs and handle access to the Leader in 
> a remote broker leader and a local leader context.
> We propose to move these protected APIs to a LeaderEndPoint interface, which 
> will serve all fetches from the Leader. We will implement a 
> RemoteLeaderEndPoint and a LocalLeaderEndPoint accordingly. This change will 
> greatly simplify our existing follower fetch code.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-06-02 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545575#comment-17545575
 ] 

Jun Rao commented on KAFKA-13953:
-

[~abasilbr] : This could be caused by data corruption on the broker. You might 
want to use the DumpLogSegments tool on the log files in the broker to see if 
there is any data corruption.

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions

2022-05-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10888.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk. Thanks [~alivshits] for the design, implementation and 
the testing.

>  Sticky partition leads to uneven product msg, resulting in abnormal delays 
> in some partitions
> --
>
> Key: KAFKA-10888
> URL: https://issues.apache.org/jira/browse/KAFKA-10888
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: jr
>Assignee: Artem Livshits
>Priority: Major
> Fix For: 3.3.0
>
> Attachments: image-2020-12-24-21-05-02-800.png, 
> image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png
>
>
>   110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster
>   The producer uses the nullkey+stick partitioner, the total production rate 
> is about 100w tps
> Observed partition delay is abnormal and message distribution is uneven, 
> which leads to the maximum production and consumption delay of the partition 
> with more messages 
> abnormal.
>   I cannot find reason that stick will make the message distribution uneven 
> at this production rate.
>   I can't switch to the round-robin partitioner, which will increase the 
> delay and cpu cost. Is thathe stick partationer design cause uneven message 
> distribution, or this is abnormal. How to solve it?
>   !image-2020-12-24-21-09-47-692.png!
> As shown in the picture, the uneven distribution is concentrated on some 
> partitions and some brokers, there seems to be some rules.
> This problem does not only occur in one cluster, but in many high tps 
> clusters,
> The problem is more obvious on the test cluster we built.
> !image-2020-12-24-21-10-24-407.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-05-04 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13815.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk

> Avoid reinitialization for a replica that is being deleted
> --
>
> Key: KAFKA-13815
> URL: https://issues.apache.org/jira/browse/KAFKA-13815
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
> Fix For: 3.3.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10002
> identified that deletion of replicas can be slow when a StopReplica request 
> is being
> processed, and has implemented a change to improve the efficiency.
> We found that the efficiency can be further improved by avoiding the 
> reinitialization of the
> leader epoch cache and partition metadata for a replica that needs to be 
> deleted.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-12841) NPE from the provided metadata in client callback in case of ApiException

2022-04-26 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-12841:
---

Fix Version/s: 3.2.0
 Assignee: Philip Nee  (was: Kirk True)
   Resolution: Fixed

merged [https://github.com/apache/kafka/pull/11689] and a followup fix 
[https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch.

> NPE from the provided metadata in client callback in case of ApiException
> -
>
> Key: KAFKA-12841
> URL: https://issues.apache.org/jira/browse/KAFKA-12841
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.6.0
> Environment: Prod
>Reporter: Avi Youkhananov
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: NPE.production
>
>
> 1.
> org.apache.kafka.clients.producer.Callback interface has method 
> onCompletion(...)
> Which says as part of the documentation :
> *The metadata for the record that was sent (i.e. the partition and offset). 
> *An empty metadata with -1 value for all fields* except for topicPartition 
> will be returned if an error occurred.
> We got an NPE from doSend(...) method in 
> org.apache.kafka.clients.producer.KafkaProducer 
> Which can occur in case ApiException was thrown ...
> In case of ApiException it uses the regular callback instead of 
> InterceptorCallback which also may cover the NPE.
> 2. More over RecordMetadata has method partition() which return int but can 
> also throw NPE because TopicPartition might be null.
> Stack trace attached.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13448) Kafka Producer Client Callback behaviour does not align with Javadoc

2022-04-26 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13448.
-
Fix Version/s: 3.2.0
 Assignee: Philip Nee
   Resolution: Fixed

merged [https://github.com/apache/kafka/pull/11689] and a followup fix 
[https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch.

> Kafka Producer Client Callback behaviour does not align with Javadoc
> 
>
> Key: KAFKA-13448
> URL: https://issues.apache.org/jira/browse/KAFKA-13448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Seamus O Ceanainn
>Assignee: Philip Nee
>Priority: Minor
> Fix For: 3.2.0
>
>
> In PR [#4188|https://github.com/apache/kafka/pull/4188] as part of 
> KAFKA-6180, a breaking change was accidentally introduced in the behaviour of 
> Callbacks for the producer client.
> Previously, whenever an exception was thrown when producing an event, the 
> value for 'metadata' passed to the Callback.onCompletion method was always 
> null. In PR [#4188|https://github.com/apache/kafka/pull/4188], in one part of 
> the code where Callback.onCompletion is called, the behaviour was changed so 
> that instead of passing a null value for metadata, a 'placeholder' value was 
> provided instead (see 
> [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1196]
>  and 
> [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1199]).
>   This placeholder contained only topic and partition information, and with 
> all other fields set to '-1'.
> This change only impacted a subset of exceptions, so that in the case of 
> ApiExceptions metadata would still be null (see 
> [here|https://github.com/apache/kafka/commit/aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR843]),
>  but for all other exceptions the placeholder value would be used. The 
> behaviour at the time of writing remains the same.
> This issue was first reported in KAFKA-7412 when a user first noticed the 
> difference between the documented behaviour of Callback.onCompletion and the 
> implemented behaviour.
> At the time it was assumed that the behaviour when errors occur was to always 
> provide a placeholder metadata value to Callback.onCompletion, and the 
> documentation was updated at that time to reflect this assumption in [PR 
> #5798|https://github.com/apache/kafka/pull/5798]. The documentation now 
> states that when an exception occurs that the method will be called with an 
> empty metadata value (see 
> [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L30-L31]).
>  However, there is still one case where Callback.onCompletion is called with 
> a null value for metadata (see 
> [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1002]),
>  so there is still a discrepancy between the documented behaviour and the 
> implementation of Callback.onCompletion.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-9296) Correlation id for response () does not match request ()

2022-04-07 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17519194#comment-17519194
 ] 

Jun Rao commented on KAFKA-9296:


[~vongosling] : It would be useful to provide a bit more detail on when the 
issue occurred. For example, did it happen on plain or SSL socket? Were there 
any disconnects either on the server side or the client? 

> Correlation id for response () does not match request ()
> 
>
> Key: KAFKA-9296
> URL: https://issues.apache.org/jira/browse/KAFKA-9296
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.2
> Environment: Flink on  k8s
>Reporter: Enhon Bryant
>Priority: Blocker
>  Labels: kafka, producer
>
> The Kafka client and broker I use are both version 0.11.0.2.   I use Kafka's 
> producer to write data to broker. I encountered the following exceptions.
> 2019-12-12 18:12:46,821 ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: Correlation id for response (11715816) does 
> not match request (11715804), request header: 
> \{api_key=0,api_version=3,correlation_id=11715804,client_id=producer-3}
>  at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752)
>  at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561)
>  at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>  at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13687) Limit number of batches when using kafka-dump-log.sh

2022-04-06 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13687.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

merged the PR to trunk.

> Limit number of batches when using kafka-dump-log.sh
> 
>
> Key: KAFKA-13687
> URL: https://issues.apache.org/jira/browse/KAFKA-13687
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.8.1
>Reporter: Sergio Troiano
>Assignee: Sergio Troiano
>Priority: Minor
>  Labels: easyfix, features
> Fix For: 3.3.0
>
>   Original Estimate: 96h
>  Remaining Estimate: 96h
>
> Currently the kafka-dump-log.sh reads the whole files(s) and dumps the 
> results of the segment file(s).
> As we know the savings when combining and using compression and batching 
> while producing (if the payloads are good candidates) are huge. 
>  
> We would like to have a way to "monitor" the way the producers produce the 
> batches as we not always  have access to the producer metrics.
> We have multitenant producers so it is hard to "detect" when the usage is not 
> the best.
>  
> The problem with the current way the DumpLogs works is it reads the whole 
> file, in an scenario of having thousands of topics with different segment 
> sizes (default is 1 GB) we could end up affecting the cluster balance as we 
> are removing useful pages from the page cache and adding what we read from 
> files. 
>  
> As we only need to take a few samples from the segments to see the pattern of 
> the usage while producing we would like to add a new parameter called 
> maxBatches.
>  
> Based on the current script the change is quite small as it only needs a 
> parameter and a counter.
>  
> After adding this change for example we could periodically take smaller 
> samples and analyze the batches headers (searching for compresscodec and the 
> batch count)
>  
> Doing this we could automate a tool to read all the topics and even going 
> further we could take the payloads of those samples when we see the client is 
> neither using compression nor batching and simulate a compression of the 
> payloads (using batching and compression) then with those numbers we can 
> reach the client for the proposal of saving money. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-31 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515532#comment-17515532
 ] 

Jun Rao edited comment on KAFKA-13773 at 3/31/22 7:25 PM:
--

[~Timelad] : Thanks for the log. This does seem to be a real issue. What 
happened is the following. The broker hit IOException in log loading during the 
initial startup.

 
{code:java}
2022-03-31 09:28:43,407 ERROR Error while writing to checkpoint file 
/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-12/leader-epoch-checkpoint 
(kafka.server.LogDirFailureChannel) 
[log-recovery-/var/lib/kafka/data-0/kafka-log0]
java.io.IOException: No space left on device
    at java.base/java.io.FileOutputStream.writeBytes(Native Method)
    at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
    at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
    at 
java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
    at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
    at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
    at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
    at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
    at 
org.apache.kafka.server.common.CheckpointFile.write(CheckpointFile.java:94)
    at 
kafka.server.checkpoints.CheckpointFileWithFailureHandler.write(CheckpointFileWithFailureHandler.scala:37)
    at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:71)
    at 
kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:291)
    at 
kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromEnd$1(LeaderEpochFileCache.scala:237)
    at 
kafka.server.epoch.LeaderEpochFileCache.truncateFromEnd(LeaderEpochFileCache.scala:234)
    at kafka.log.LogLoader$.$anonfun$load$12(LogLoader.scala:188)
    at kafka.log.LogLoader$.$anonfun$load$12$adapted(LogLoader.scala:188)
    at scala.Option.foreach(Option.scala:437)
    at kafka.log.LogLoader$.load(LogLoader.scala:188)
    at kafka.log.UnifiedLog$.apply(UnifiedLog.scala:1785)
    at kafka.log.LogManager.loadLog(LogManager.scala:282)
    at kafka.log.LogManager.$anonfun$loadLogs$13(LogManager.scala:368)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829){code}
 

Normally, this will cause the broker to have a hard shutdown, but only after 
the ReplicaManager is started, which happens after log loading. Then the 
IOException is also propagated to KafkaServer, which causes it to exit 
normally. As part of the normal exit, a clean shutdown file will be written. 
This causes the next broker restart to skip log recovery.

 
{code:java}
2022-03-31 09:28:43,411 ERROR There was an error in one of the threads during 
logs loading: org.apache.kafka.common.errors.KafkaStorageException: Error while 
writing to checkpoint file 
/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-12/leader-epoch-checkpoint 
(kafka.log.LogManager) [main]
2022-03-31 09:28:43,414 ERROR [KafkaServer id=0] Fatal error during KafkaServer 
startup. Prepare to shutdown (kafka.server.KafkaServer) [main]
2022-03-31 09:28:43,415 INFO [KafkaServer id=0] shutting down 
(kafka.server.KafkaServer) [main]
2022-03-31 09:28:43,418 INFO Shutting down. (kafka.log.LogManager) 
[main]2022-03-31 09:28:43,466 INFO Shutdown complete. (kafka.log.LogManager) 
[main]{code}
 

The issue with skipping recovery is that some of the preallocated timeindex 
file won't be shrunk to the right size and we will pick up some garbage as the 
timestamp.


was (Author: junrao):
[~Timelad] : Thanks for the log. This does seem to be a real issue. What 
happened is the following. The broker hit IOException in log loading during the 
initial startup.

 
{code:java}
2022-03-31 09:28:43,407 ERROR Error while writing to checkpoint file 
/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-12/leader-epoch-checkpoint 
(kafka.server.LogDirFailureChannel) 
[log-recovery-/var/lib/kafka/data-0/kafka-log0]
java.io.IOException: No space left on device
    at java.base/java.io.FileOutputStream.writeBytes(Native Method)
    at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
    at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
    at 
java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
    at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
    at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
    at 

[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-31 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515532#comment-17515532
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : Thanks for the log. This does seem to be a real issue. What 
happened is the following. The broker hit IOException in log loading during the 
initial startup.

 
{code:java}
2022-03-31 09:28:43,407 ERROR Error while writing to checkpoint file 
/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-12/leader-epoch-checkpoint 
(kafka.server.LogDirFailureChannel) 
[log-recovery-/var/lib/kafka/data-0/kafka-log0]
java.io.IOException: No space left on device
    at java.base/java.io.FileOutputStream.writeBytes(Native Method)
    at java.base/java.io.FileOutputStream.write(FileOutputStream.java:354)
    at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
    at 
java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
    at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
    at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
    at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:251)
    at java.base/java.io.BufferedWriter.flush(BufferedWriter.java:257)
    at 
org.apache.kafka.server.common.CheckpointFile.write(CheckpointFile.java:94)
    at 
kafka.server.checkpoints.CheckpointFileWithFailureHandler.write(CheckpointFileWithFailureHandler.scala:37)
    at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:71)
    at 
kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:291)
    at 
kafka.server.epoch.LeaderEpochFileCache.$anonfun$truncateFromEnd$1(LeaderEpochFileCache.scala:237)
    at 
kafka.server.epoch.LeaderEpochFileCache.truncateFromEnd(LeaderEpochFileCache.scala:234)
    at kafka.log.LogLoader$.$anonfun$load$12(LogLoader.scala:188)
    at kafka.log.LogLoader$.$anonfun$load$12$adapted(LogLoader.scala:188)
    at scala.Option.foreach(Option.scala:437)
    at kafka.log.LogLoader$.load(LogLoader.scala:188)
    at kafka.log.UnifiedLog$.apply(UnifiedLog.scala:1785)
    at kafka.log.LogManager.loadLog(LogManager.scala:282)
    at kafka.log.LogManager.$anonfun$loadLogs$13(LogManager.scala:368)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829){code}
 

Normally, this will cause the broker to have a hard shutdown, but only after 
the ReplicaManager is started, which happens after log loading. Then the 
IOException is also propagated to KafkaServer, which causes it to exit 
normally. As part of the normal exit, a clean shutdown file will be written. 
This causes the next broker restart to skip log recovery.

 
{code:java}
2022-03-31 09:28:43,411 ERROR There was an error in one of the threads during 
logs loading: org.apache.kafka.common.errors.KafkaStorageException: Error while 
writing to checkpoint file 
/var/lib/kafka/data-0/kafka-log0/__consumer_offsets-12/leader-epoch-checkpoint 
(kafka.log.LogManager) [main]
2022-03-31 09:28:43,414 ERROR [KafkaServer id=0] Fatal error during KafkaServer 
startup. Prepare to shutdown (kafka.server.KafkaServer) [main]
2022-03-31 09:28:43,415 INFO [KafkaServer id=0] shutting down 
(kafka.server.KafkaServer) [main]
2022-03-31 09:28:43,418 INFO Shutting down. (kafka.log.LogManager) 
[main]2022-03-31 09:28:43,466 INFO Shutdown complete. (kafka.log.LogManager) 
[main]{code}

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip, kafka-start-to-finish.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 

[jira] [Resolved] (KAFKA-12875) Change Log layer segment map mutations to avoid absence of active segment

2022-03-31 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12875.
-
Fix Version/s: 3.3.0
 Assignee: Yu Yang
   Resolution: Fixed

Merged to trunk.

> Change Log layer segment map mutations to avoid absence of active segment
> -
>
> Key: KAFKA-12875
> URL: https://issues.apache.org/jira/browse/KAFKA-12875
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Kowshik Prakasam
>Assignee: Yu Yang
>Priority: Major
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/pull/10650] showed a case where active 
> segment was absent when Log layer segments were iterated. We should 
> investigate Log layer code to see if we can change Log layer segment map 
> mutations to avoid absence of active segment at any given point. For example, 
> if we are clearing all segments and creating a new one, maybe we can reverse 
> the order to create a new segment first and then clear the old ones later.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514888#comment-17514888
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : I checked kafka-0-2.8.0-before-fail.log 
kafka-1-2.8.0-before-fail.log in kafka-2.8.0-crash.zip. They both seem to have 
timestamp after 13:23:00,077, which is the time when the recovery is skipped. 

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-30 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514808#comment-17514808
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : Thanks for the additional logs. I took a quick look at 
kafka-2-2.8.0-before-resize.log. The broker did skip recovery, but it's not 
clear if the previous shutdown was clean or not. Do you have the log before 
that? During loading, the broker shut down abruptly due to the no space issue. 
After that, if the broker is restarted again, it should go through log 
recovery, did that happen?

 
{code:java}
2022-03-30 13:23:00,077 INFO Skipping recovery for all logs in 
/var/lib/kafka/data-0/kafka-log2 since clean shutdown file was found 
(kafka.log.LogManager) [main]
{code}
 

 

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-03-29 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514245#comment-17514245
 ] 

Jun Rao commented on KAFKA-13773:
-

[~Timelad] : Thanks for filing the jira. I ran the following tool on one the 
log segments. The log file seems corrupted.  

 
{code:java}
bin/kafka-dump-log.sh  --files 
kafka-2-retention/audit-trail-0/.log
baseOffset: 30432 lastOffset: 30432 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false 
isControl: false position: 250051313 CreateTime: 1648460938666 size: 8216 
magic: 2 compresscodec: none crc: 2462031276 isvalid: true
baseOffset: 30433 lastOffset: 30433 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false 
isControl: false position: 250059529 CreateTime: 1648460938942 size: 8219 
magic: 2 compresscodec: none crc: 563640287 isvalid: true
Found 5340 invalid bytes at the end of .log
{code}
 

If the file is used as it is to determine the max timestamp, it could lead to 
invalid timestamp. 

Normally, if the broker dies because of no disk space, on restarting, the 
broker will go through log recovery to check the validity of the data. However, 
from the log4j file, it doesn't seem there was log recovery. So, I am wondering 
if the broker crashed before that log segment was rolled and flushed, or after.

 

 

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 3.1.0, 2.8.1
>Reporter: Tm Alkemade
>Priority: Major
> Attachments: kafka-.zip, kafka-logfiles.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13744) Quota metric tags are inconsistent

2022-03-15 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507142#comment-17507142
 ] 

Jun Rao commented on KAFKA-13744:
-

[~jeqo] : Thanks for reporting this issue. It seems that the tags are added as 
designed in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-55%3A+Secure+Quotas+for+Authenticated+Users]
 . The idea is that if a client is not authenticated with the brokers, we want 
to tag the clients with the cliendId. If a client is authenticated with the 
brokers, we typically can tag the clients with user, but giving the client the 
option to be tagged by both user and clientId.

> Quota metric tags are inconsistent
> --
>
> Key: KAFKA-13744
> URL: https://issues.apache.org/jira/browse/KAFKA-13744
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: quotas
> Attachments: image-2022-03-15-16-57-12-583.png
>
>
> When enabling metrics for quotas the metrics apply to _all_ clients (see 
> https://issues.apache.org/jira/browse/KAFKA-13742).
> Though, the tags are calculated depending on the quotas registered and 
> applied to all clients: 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694]
> This causes different metric tags result depending on which quota is 
> registered first.
> For instance, if a quota is registered with userId and clientId, then metrics 
> are tagged with both, though if then a quota is registered with only tagged 
> with clientId, then all metrics are only tagged by clientId — even though 
> user principal is available.
> !image-2022-03-15-16-57-12-583.png|width=1034,height=415!
> I managed to reproduce this behavior here:
>  * From 10:30 to 10:45, there was a quota with both client-id and user-id
>  * It was removed by 10:45, so no metrics were exposed.
>  * After, a quota with client id was created, and metrics were collected only 
> with client id, even though the user was available.
> I'd expect metrics to always contain both, if available — and simplify the 
> logic here 
> [https://github.com/apache/kafka/blob/0b9a8bac36f16b5397e9ec3a0441758e4b60a384/core/src/main/scala/kafka/server/ClientQuotaManager.scala#L649-L694].



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503854#comment-17503854
 ] 

Jun Rao commented on KAFKA-13723:
-

[~xiongqiwu] : Thanks for the explanation. Make sense. So, this is not an issue.

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13723.
-
Resolution: Not A Problem

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503813#comment-17503813
 ] 

Jun Rao commented on KAFKA-13723:
-

[~xiongqiwu] and [~jjkoshy]  : Could you check if this is a real issue? Thanks.

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)
Jun Rao created KAFKA-13723:
---

 Summary: max.compaction.lag.ms implemented incorrectly
 Key: KAFKA-13723
 URL: https://issues.apache.org/jira/browse/KAFKA-13723
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0
Reporter: Jun Rao


In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
time. 

 

The implementation in LogCleanerManager has the following code. The path for 
earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
it seems that we should set the delay to 0 so that we could trigger cleaning 
immediately since the segment has been dirty for longer than 
max.compaction.lag.ms. 

 

 
{code:java}
def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
Long = {

...

val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
val cleanUntilTime = now - maxCompactionLagMs

if (earliestDirtySegmentTimestamp < cleanUntilTime)
cleanUntilTime - earliestDirtySegmentTimestamp
else
0L
}{code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503785#comment-17503785
 ] 

Jun Rao commented on KAFKA-10690:
-

[~ocadaruma] : Thanks for filing the jira. Have you tried enabling replication 
throttling? This will help prevent the out-of-sync replicas from pulling data 
too aggressively. 

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We think this issue can be addressed by designating part of 
> ReplicaFetcherThread (or creating another thread pool) for lagging replica 
> catching-up, but not so sure this is the appropriate way.
> Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12901) Metadata not updated after broker restart.

2022-02-28 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17499138#comment-17499138
 ] 

Jun Rao commented on KAFKA-12901:
-

[~suriyav] : Did you see the same log error as in KAFKA-7987? If not, it may 
not be related. 2.8 is quite old though. You probably want to upgrade to a 
newer version.

> Metadata not updated after broker restart.
> --
>
> Key: KAFKA-12901
> URL: https://issues.apache.org/jira/browse/KAFKA-12901
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Suriya Vijayaraghavan
>Priority: Major
>
> We upgraded to version 2.8 from 2.7. After monitoring for few weeks we 
> upgraded in our production setup (as we didn't enable Kraft we went ahead), 
> we faced TimeoutException in our clients after few weeks in our production 
> setup. We tried to list all active brokers using admin client API, all 
> brokers were listed properly. So we logged into that broker and tried to do a 
> describe topic with localhost as bootstrap-server, but we got timeout as 
> there.
> When checking the logs, we noticed a Shutdown print from kafka-shutdown-hook
> thread (zookeeper session timed out and we had three retry failures). But the 
> controlled shutdown got failed (got unknown server error response from the 
> controller), and proceeded to unclean shutdown. Still the process didn't get 
> quit but the process didnt process any other operation as well.  And this did 
> not remove the broker from alive status for hours (able to see this broker in 
> list of brokers) and our clients were still trying to contact this broker and 
> failing with timeout exception. So we tried restarting the problematic 
> broker, but we faced unknown topic or partition issue in our client after the 
> restart which caused timeout as well. We noticed that metadata was not 
> loaded. So we had to restart our controller. And after restarting the 
> controller everthing got back to normal.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13461) KafkaController stops functioning as active controller after ZooKeeperClient auth failure

2022-02-24 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497763#comment-17497763
 ] 

Jun Rao commented on KAFKA-13461:
-

Basically, when there is no JAAS configured for ZK client and the ZK client 
tries to establish a new connection, the client will first receive an AUTH_FAIL 
event. However, this doesn't mean that the ZK client's session is gone since 
the client will retry the connection without auth, which typically succeeds. 
Previously, we mistakenly try to reinitialize the controller with the AUTH_FAIL 
event, which causes the controller to resign but not regain the controllership 
since the underlying session is still valid.

> KafkaController stops functioning as active controller after ZooKeeperClient 
> auth failure
> -
>
> Key: KAFKA-13461
> URL: https://issues.apache.org/jira/browse/KAFKA-13461
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Reporter: Vincent Jiang
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.1.0, 3.0.1
>
>
> When java.security.auth.login.config is present, but there is no "Client" 
> section,  ZookeeperSaslClient creation fails and raises LoginExcpetion, 
> result in warning log:
> {code:java}
> WARN SASL configuration failed: javax.security.auth.login.LoginException: No 
> JAAS configuration section named 'Client' was found in specified JAAS 
> configuration file: '***'. Will continue connection to Zookeeper server 
> without SASL authentication, if Zookeeper server allows it.{code}
> When this happens after initial startup, ClientCnxn enqueues an AuthFailed 
> event which will trigger following sequence:
>  # zkclient reinitialization is triggered
>  # the controller resigns.
>  # Before the controller's ZK session expires, the controller successfully 
> connect to ZK and maintains the current session
>  # In KafkaController.elect(), the controller sets activeControllerId to 
> itself and short-circuits the rest of the elect. Since the controller 
> resigned earlier and also skips the call to onControllerFailover(), the 
> controller is not actually functioning as the active controller (e.g. the 
> necessary ZK watchers haven't been registered).
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2022-02-24 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13407.
-
Resolution: Fixed

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection established, initiating session, client: 
> /10.10.85.215:39338, server: 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,569] INFO 
> Session establishment complete on server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181, 
> sessionid = 0x1f2a12870003, negotiated timeout = 18000 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,548] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [PartitionStateMachine controllerId=1003] Stopped partition state machine 
> (kafka.controller.ZkPartitionStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [ReplicaStateMachine controllerId=1003] Stopped replica state machine 
> (kafka.controller.ZkReplicaStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,552] INFO 
> [RequestSendThread controllerId=1003] Shutdown completed 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,554] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,554] 

[jira] [Commented] (KAFKA-13407) Kafka controller out of service after ZK leader restart

2022-02-24 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497760#comment-17497760
 ] 

Jun Rao commented on KAFKA-13407:
-

[~Olsson] : Thanks for the reply. I think we fixed this issue in 
https://issues.apache.org/jira/browse/KAFKA-13461. Basically, when there is no 
JAAS configured for ZK client and the ZK client tries to establish a new 
connection, the client will first receive an AUTH_FAIL event. However, this 
doesn't mean that the ZK client's session is gone since the client will retry 
the connection without auth, which typically succeeds. Previously, we 
mistakenly try to reinitialize the controller with the AUTH_FAIL event, which 
causes the controller to resign but not regain the controllership since the 
underlying session is still valid. 
https://issues.apache.org/jira/browse/KAFKA-1346 fixes that issue.

 

I am closing the issue for now. Feel free to reopen it if that doesn't fix the 
issue.

> Kafka controller out of service after ZK leader restart
> ---
>
> Key: KAFKA-13407
> URL: https://issues.apache.org/jira/browse/KAFKA-13407
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 2.8.1
> Environment: Ubuntu 20.04
>Reporter: Daniel
>Priority: Critical
>
> When the Zookeeper leader disappears, a new instance becomes the leader, the 
> instances need to reconnect to Zookeeper, but the Kafka "Controller" gets 
> lost in limbo state after re-establishing connection.
> See below for how I manage to reproduce this over and over.
> *Prerequisites*
> Have a Kafka cluster with 3 instances running version 2.8.1. Figure out which 
> one is the Controller. I'm using Kafkacat 1.5.0 and get this info using the 
> `-L` flag.
> Zookeeper runs with 3 instances on version 3.5.9. Figure out which one is 
> leader by checking
>  
> {code:java}
> echo stat | nc -v localhost 2181
> {code}
>  
>  
> *Reproduce*
> 1. Stop the leader Zookeeper service.
> 2. Watch the logs of the Kafka Controller and ensure that it reconnects and 
> registers again.
>  
> {code:java}
> Oct 27 09:13:08 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:08,882] INFO 
> Unable to read additional data from server sessionid 0x1f2a12870003, likely 
> server has closed socket, closing socket connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] WARN 
> SASL configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/opt/kafka/config/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] INFO 
> Opening socket connection to server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,548] ERROR 
> [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,549] INFO 
> Socket connection established, initiating session, client: 
> /10.10.85.215:39338, server: 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:10 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:10,569] INFO 
> Session establishment complete on server 
> zookeeper-kafka.service.consul.lab.aws.blue.example.net/10.10.84.12:2181, 
> sessionid = 0x1f2a12870003, negotiated timeout = 18000 
> (org.apache.zookeeper.ClientCnxn)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,548] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [PartitionStateMachine controllerId=1003] Stopped partition state machine 
> (kafka.controller.ZkPartitionStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,550] INFO 
> [ReplicaStateMachine controllerId=1003] Stopped replica state machine 
> (kafka.controller.ZkReplicaStateMachine)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutting down 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Stopped 
> (kafka.controller.RequestSendThread)
> Oct 27 09:13:11 ip-10-10-85-215 kafka[62961]: [2021-10-27 09:13:11,551] INFO 
> [RequestSendThread controllerId=1003] Shutdown 

  1   2   3   4   5   >