[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey

2022-07-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566414#comment-17566414
 ] 

Guozhang Wang commented on KAFKA-14070:
---

Hello [~balajirrao], thanks for filing this ticket. Could you provide a 
specific example of key types that would break `queryMetadataForKey`? Note that 
the function takes in a partitioner still to ask users to provide the 
partitioner if possible to determine which partition contains the specific key.

> Improve documentation for queryMetadataForKey
> -
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Balaji Rao
>Priority: Minor
>
> When using key-value state stores with Processor API, one can add key-value 
> state stores of arbitrary key types to a topology. This could lead to the 
> method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
> expectations.
> In my understanding, `queryMetadataForKey` uses the source topics of the 
> processor connected to the store to return the `KeyQueryMetadata`. This means 
> that it could provide "incorrect" answers when used with key-value stores of 
> arbitrary key types. The description of the method should be improved to make 
> users aware of this pitfall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13386) Foreign Key Join filtering out valid records after a code change / schema evolved

2022-06-29 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17560812#comment-17560812
 ] 

Guozhang Wang commented on KAFKA-13386:
---

Hi [~nikuis], I agree with the same FK1 it will be sent to the same node and 
hence won't have out of ordering data, but you'd still end up with two 
duplicated join result since the second subscription would come back and join 
with the current value again (assuming it's still the same).

I think by the time we have multi-versioned join, we can relax on the ordering 
of the emit since different versioned join result would not rely on the emit 
ordering any more.

> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> -
>
> Key: KAFKA-13386
> URL: https://issues.apache.org/jira/browse/KAFKA-13386
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Sergio Duran Vegas
>Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2022-06-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557576#comment-17557576
 ] 

Guozhang Wang commented on KAFKA-12478:
---

Thanks [~hudeqi], I will take a look at the KIP.

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.1.1
>Reporter: hudeqi
>Priority: Blocker
>  Labels: kip-842
> Attachments: safe-console-consumer.png, safe-consume.png, 
> safe-produce.png, trunk-console-consumer.png, trunk-consume.png, 
> trunk-produce.png
>
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*
>  
> I did a test and the result is as attached screenshot. Firstly, set by 
> producer and consumer "metadata.max.age.ms" are 500ms and 3ms 
> respectively.
> _trunk-console-consumer.png_ means to use the community version to start the 
> consumer and set "latest". 
> _trunk-produce.png_ means the data produced, "partition_count" means the 
> number of partitions of the current topic, "message" means the digital 
> content of the corresponding message, "send_to_partition_index" Indicates the 
> index of the partition to which the corresponding message is sent. It can be 
> seen that at 11:32:10, the producer perceives the expansion of the total 
> partitions from 2 to 3, and writes the numbers 38, 41, and 44 into the newly 
> expanded partition 2.
> _trunk-consume.png_ represents all the digital content consumed by the 
> community version. You can see that 38 and 41 sent to partition 2 were not 
> consumed at the beginning. Finally, after partition 2 was perceived, 38 and 
> 41 were still not consumed. Instead, it has been consumed from the latest 44, 
> so the two data of 38 and 41 are discarded.
>  
> _safe-console-consumer.png_ means to use the fixed version to start the 
> consumer and set "safe_latest". 
> _safe-produce.png_ means the data produced. It can be seen that at 12:12:09, 
> the producer perceives the expansion of the total partitions from 4 to 5, and 
> writes the numbers 109 and 114 into the newly expanded partition 4.
> _safe-consume.png_ represents all the digital content consumed by the fixed 
> version. You can see that 109 sent to partition 4 were not consumed at the 
> beginning. Finally, after partition 4 was perceived,109 was consumed as the 
> first data of partition 4. So the fixed version will not cause consumption to 
> lose data under this condition.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception

2022-06-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557572#comment-17557572
 ] 

Guozhang Wang commented on KAFKA-13840:
---

[~showuon] I read through the discussions here and I feel it's probably because 
in [~kyle.stehbens]'s test there's no `poll` call triggered during the time 
`commitAsync` is called and the future is being waited. Since in your fix, the 
future is expected to be only cleared via the `poll` call which would trigger 
`ensureCoordinatorReady` since the HB thread is not created for manual `assign` 
mode (note that `ensureCoordinatorReady` was not triggered in `commitAsync`). 
However if user never calls `poll` again after `commitAsync` then that future 
would not be cleared.

The follow-up hotfix (https://github.com/apache/kafka/pull/12259/files) would 
be resilient to such a pattern since it triggers `ensureCoordinatorReady` 
inside the `commitAsync` call itself. WDYT?

Unfortunately that fix has not be included in any released versions yet. 
[~kyle.stehbens] would you be willing to try out testing it on top of trunk to 
see if it works then?

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -
>
> Key: KAFKA-13840
> URL: https://issues.apache.org/jira/browse/KAFKA-13840
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>Reporter: Kyle R Stehbens
>Assignee: Luke Chen
>Priority: Major
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13880) DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages

2022-06-17 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13880.
---
Fix Version/s: 3.3.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> DefaultStreamPartitioner may get "stuck" to one partition for unkeyed messages
> --
>
> Key: KAFKA-13880
> URL: https://issues.apache.org/jira/browse/KAFKA-13880
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Artem Livshits
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.3.0
>
>
> While working on KIP-794, I noticed that DefaultStreamPartitioner does not 
> call .onNewBatch.  The "sticky" DefaultStreamPartitioner introduced as a 
> result of https://issues.apache.org/jira/browse/KAFKA-8601 requires 
> .onNewBatch call in order to switch to a new partitions for unkeyed messages, 
> just calling .partition would return the same "sticky" partition chosen 
> during the first call to .partition.  The partition doesn't change even if 
> the partition leader is unavailable.
> Ideally, for unkeyed messages the DefaultStreamPartitioner should take 
> advantage of the new built-in partitioning logic introduced in 
> [https://github.com/apache/kafka/pull/12049.]  Perhaps, it could return null 
> partition for unkeyed message, so that KafkaProducer could run built-in 
> partitioning logic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12639) AbstractCoordinator ignores backoff timeout when joining the consumer group

2022-06-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553811#comment-17553811
 ] 

Guozhang Wang commented on KAFKA-12639:
---

This is a separate issue, as `joinGroupIfNeeded` is not called by the heartbeat 
thread at all.

I checked the code in trunk and compared with 2.7, I can confirm the situation 
still persists there. But this may not be a real issue, just in 
[~matiss.gutmanis]'s testing environment.

The key is that inside the while loop we only check `while 
(rejoinNeededOrPending()) {` and is only existing due to timed out if the 
response cannot be received in time. This is relied on the fact that if the 
timer has already elapsed, after sending the join-group request we should be 
effectively triggering `select(0)` in which we should rarely receive the 
response immediately, and hence at that time we would be exiting out of the 
while loop. I suspect in this testing the cluster failure is shortcut-mocked by 
returning immediately the response (again, which should not be a common case in 
practice) which caused the calling loop to never exit.

All that being said, we can still augment the condition as `while 
(rejoinNeededOrPending() && timer.notElapsed()) {`, but doing so has a 
potential caveat that we would not send out the request before exiting the 
while loop --- and this is also the intention of the current code to always 
make sure we have the inflight request before exiting, to optimize the 
pipelining latency slightly --- cc  [~pnee] to chime in here.

> AbstractCoordinator ignores backoff timeout when joining the consumer group
> ---
>
> Key: KAFKA-12639
> URL: https://issues.apache.org/jira/browse/KAFKA-12639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.7.0
>Reporter: Matiss Gutmanis
>Assignee: Philip Nee
>Priority: Major
>
> We observed heavy logging while trying to join consumer group during partial 
> unavailability of Kafka cluster (it's part of our testing process). Seems 
> that {{rebalanceConfig.retryBackoffMs}} used in  {{ 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeeded}}
>  is not respected. Debugging revealed that {{Timer}} instance technically is 
> expired thus using sleep of 0 milliseconds which defeats the purpose of 
> backoff timeout.
> Minimal backoff timeout should be respected.
>  
> {code:java}
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,488 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,489 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coordinator 
> 127.0.0.1:9092 (id: 2147483634 rack: null) is loading the group.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] Rebalance failed.
> org.apache.kafka.common.errors.CoordinatorLoadInProgressException: The 
> coordinator is loading and hence can't process requests.
> 2021-03-30 08:30:24,490 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] (Re-)joining group
> 2021-03-30 08:30:24,491 INFO 
> [fs2-kafka-consumer-41][o.a.k.c.c.i.AbstractCoordinator] [Consumer 
> clientId=app_clientid, groupId=consumer-group] JoinGroup failed: Coord

[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-06-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17553806#comment-17553806
 ] 

Guozhang Wang commented on KAFKA-10575:
---

I took another look at this ticket while working on KAFKA-10199 in parallel, 
and here are some updates:

1. I could confirm that today we only call `onRestoreEnd` for case 1 above, and 
for case 2/3 from [~ableegoldman] we do not (in fact case 3) is just a special 
case of case 2) since we would first transit to CLOSED anyways).
2. On a second thought, there may be different group of users who were 
anticipating the semantics of such callbacks, for example:

a) The original complaint that drives this ticket, is based on the anticipation 
that each `onRestoreStart` would always be paired with an `onRestoreEnd`. This 
is not actually the case because of case 2/3 above.
b) Others may anticipate that `onRestoreEnd` is only triggered when the 
restoration is actually completed. In fact this is what we explicitly stated in 
the javadocs.

So, if we just call `onRestoreEnd` on case 2/3) above, we may make users in a) 
happier but we would break compatibilities of users in b). In addition, since 
given the same topic-partition, and store names, there might be multiple 
restoration process happening at the same time e.g. when there are standby 
replicas, it's not very straight-forward trying to pair each one of 
`onRestoreStart` with a unique `onRestoreEnd`.

With those thoughts, I'm now leaning towards not just calling `onRestoreEnd` 
for case 2/3), but instead introduce a new API e.g. `onRestorePaused` for case 
2/3), plus also document clearly that not every `onRestoreStart` would be 
paired exactly with an `onRestoreEnd/Paused` to reduce user's unrealistic 
anticipations.

Thoughts?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-06-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10575:
--
Labels:   (was: new-streams-runtime-should-fix)

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-10199) Separate state restoration into separate threads

2022-06-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10199:
--
Labels:   (was: new-streams-runtime-should-fix)

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-10199) Separate state restoration into separate threads

2022-06-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10199:
--
Labels: new-streams-runtime-should-fix  (was: )

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551235#comment-17551235
 ] 

Guozhang Wang commented on KAFKA-13939:
---

Once the PR is merged, we can cherry-pick the commit to old branches. But 
whether the fix would be release depends on whether we would have a bug-fix 
release (e.g. say 3.2.1) planned in the future.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13214) Consumer should not reset group state after disconnect

2022-06-06 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13214:
--
Labels: new-consumer-threading-should-fix  (was: )

> Consumer should not reset group state after disconnect
> --
>
> Key: KAFKA-13214
> URL: https://issues.apache.org/jira/browse/KAFKA-13214
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: new-consumer-threading-should-fix
> Fix For: 2.7.2, 2.8.1, 3.0.0
>
>
> When the consumer disconnects from the coordinator while a rebalance is in 
> progress, we currently reset the memberId and generation. The coordinator 
> then must await the session timeout in order to expire the old memberId. This 
> was apparently a regression from 
> https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478.
>  It would be better to keep the memberId/generation.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-12983) onJoinPrepare is not always invoked before joining the group

2022-06-06 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12983:
--
Labels: new-consumer-threading-should-fix  (was: )

> onJoinPrepare is not always invoked before joining the group
> 
>
> Key: KAFKA-12983
> URL: https://issues.apache.org/jira/browse/KAFKA-12983
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.7.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 2.7.2, 2.8.1, 3.0.0
>
>
> As the title suggests, the #onJoinPrepare callback is not always invoked 
> before a member (re)joins the group, but only once when it first enters the 
> rebalance. This means that any updates or events that occur during the join 
> phase can be lost in the internal state: for example, clearing the 
> SubscriptionState (and thus the "ownedPartitions" that are used for 
> cooperative rebalancing) after losing its memberId during a rebalance.
> We should reset the `needsJoinPrepare` flag inside the resetStateAndRejoin() 
> method



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-06-01 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17545102#comment-17545102
 ] 

Guozhang Wang commented on KAFKA-13939:
---

Thanks [~jnewhouse], I looked at the code you pointed it out and I agree it's a 
bug indeed, and should be fixed asap. Please let us know if you'd like to open 
a PR to fix it.

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13934) Consider consolidating TimeWindow / SessionWindow / SlidingWindow

2022-05-24 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13934:
-

 Summary: Consider consolidating TimeWindow / SessionWindow / 
SlidingWindow
 Key: KAFKA-13934
 URL: https://issues.apache.org/jira/browse/KAFKA-13934
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


In Streams windowing operations we have several inherited classes from 
`Window`, as listed in the title of the ticket. They represent differences for:

1) Serialization of the window as part of the windowed key.
2) Window operations which is based on inclusive/exclusiveness of the window 
start/end.

As a result, we have resulted in lots of duplicated code to handle those 
different windows in windowed aggregations.

We can consider if it's worth serializing those window types differently 
(especially if we can get rid of the sequence id for time windows used for 
joins) and if we can just have a single class with booleans indicating 
inclusive/exclusiveness of the start/end, and hence as a result can largely 
reduce our code duplication around the serde and common window operations 
inside the stateful operator.  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13621) Resign leader on network partition

2022-05-21 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17540486#comment-17540486
 ] 

Guozhang Wang commented on KAFKA-13621:
---

 [~jagsancio] Yes I think with KIP-835 we can augment this ticket from "resign 
after not receiving fetch request from majority within a timeout" to "resign 
after not receiving fetch request, AND not advancing the committed watermark 
after a timeout". To answer your question:

1) I think it could be in the controller model for now, since although it 
sounds to me as a general optimization not controller/metadata specific it does 
rely on the fact that we always have new appends periodically. We can consider 
moving it into raft after we've experimented for a while and even use the 
protocol for data partitions in the future.
2) If we make the resign condition as above (i.e. both needs to be true) then I 
think this is fine?

> Resign leader on network partition
> --
>
> Key: KAFKA-13621
> URL: https://issues.apache.org/jira/browse/KAFKA-13621
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> h1. Motivation
> If the current leader A at epoch X gets partition from the rest of the 
> quorum, quorum voter A will stay leader at epoch X. This happens because 
> voter A will never receive an request from the rest of the voters increasing 
> the epoch. These requests that typically increase the epoch of past leaders 
> are BeginQuorumEpoch and Vote.
> In addition if voter A (leader at epoch X) doesn't get partition from the 
> rest of the brokers (observer in the KRaft protocol) the brokers will never 
> learn about the new quorum leader. This happens because 1. observers learn 
> about the leader from the Fetch response and 2. observer send a Fetch request 
> to a random leader if the Fetch request times out.
> Neither of these two scenarios will cause the broker to send a request to a 
> different voter because the leader at epoch X will never send a different 
> leader in the response and the broker will never send a Fetch request to a 
> different voter because the Fetch request will never timeout.
> h1. Proposed Changes
> In this scenario the A, the leader at epoch X, will stop receiving Fetch 
> request from the majority of the voters. Voter A should resign as leader if 
> the Fetch request from the majority of the voters is old enough. A reasonable 
> value for "old enough" is the Fetch timeout value.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13745.
---
Resolution: Fixed

> Flaky 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
> -
>
> Key: KAFKA-13745
> URL: https://issues.apache.org/jira/browse/KAFKA-13745
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
>   at 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-05-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17536959#comment-17536959
 ] 

Guozhang Wang commented on KAFKA-13745:
---

Hi [~sagarrao] I'm happy to resolve the ticket now, and if we see it again we 
can re-create.

> Flaky 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
> -
>
> Key: KAFKA-13745
> URL: https://issues.apache.org/jira/browse/KAFKA-13745
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
>   at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
>   at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
>   at 
> kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13800) Remove force cast of TimeWindowKStreamImpl in tests of https://github.com/apache/kafka/pull/11896

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13800.
---
Resolution: Fixed

> Remove force cast of TimeWindowKStreamImpl in tests of 
> https://github.com/apache/kafka/pull/11896
> -
>
> Key: KAFKA-13800
> URL: https://issues.apache.org/jira/browse/KAFKA-13800
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> We can remove the cast after `emitStrategy` is added to public api



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed

2022-05-13 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13746.
---
Resolution: Fixed

> Flaky 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
> 
>
> Key: KAFKA-13746
> URL: https://issues.apache.org/jira/browse/KAFKA-13746
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Example: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/
> {code}
> java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
>   at 
> kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13879) Exponential backoff for reconnect does not work

2022-05-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-13879:
-

Assignee: Chern Yih Cheah

> Exponential backoff for reconnect does not work
> ---
>
> Key: KAFKA-13879
> URL: https://issues.apache.org/jira/browse/KAFKA-13879
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 2.7.0
>Reporter: Chern Yih Cheah
>Assignee: Chern Yih Cheah
>Priority: Minor
>
> When a client connects to a SSL listener using PLAINTEXT security protocol, 
> after the TCP connection is setup, the client considers the channel setup is 
> complete (in reality the channel setup is not complete yet). The client 
> issues API version request after that. When issuing API version request, 
> reconnection exponential backoff is reset. Since the broker expects SSL 
> handshake, client's API version request will cause the connection to 
> disconnect. Reconnect will happen without exponential backoff since it has 
> been reset.
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L249.]
>   



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17532480#comment-17532480
 ] 

Guozhang Wang commented on KAFKA-13877:
---

Thanks [~lkokhreidze]!

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13877:
--
Labels: newbie  (was: )

> Flaky 
> RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
> 
>
> Key: KAFKA-13877
> URL: https://issues.apache.org/jira/browse/KAFKA-13877
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> The following test fails on local testbeds about once per 10-15 runs:
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:87)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at org.junit.Assert.assertTrue(Assert.java:53)
>   at 
> org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
>   at 
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
>   at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags

2022-05-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13877:
-

 Summary: Flaky 
RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
 Key: KAFKA-13877
 URL: https://issues.apache.org/jira/browse/KAFKA-13877
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Guozhang Wang


The following test fails on local testbeds about once per 10-15 runs:

{code}
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:87)
at org.junit.Assert.assertTrue(Assert.java:42)
at org.junit.Assert.assertTrue(Assert.java:53)
at 
org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-05-01 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530569#comment-17530569
 ] 

Guozhang Wang commented on KAFKA-13857:
---

Hi [~RivenSun], I think it may be valuable to allow admin clients to get LEOs. 
Since this involves changing the brokers API server with a bump on the list 
offset protocol like you said, I'd just want to know if there's an existing 
urgent use case that can support the KIP proposing it.

If you feel strong about adding the feature now than in the future, please do 
not hesitant to propose a KIP asking other's opinions.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
> return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-04-27 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528894#comment-17528894
 ] 

Guozhang Wang commented on KAFKA-13857:
---

Hi [~RivenSun] do you have a concrete use case in your production or somewhere 
else that requires this feature?

Generally speaking, for records appended after the HWM since there's a 
possibility that they will be truncated later, brokers tend to never expose 
those records to any clients (including admin). In other words, the abstraction 
that brokers want to provide to clients are only for those "persisted" records 
(here "persisted" does not mean it's flushed to disks, but that it's replicated 
and hence not likely to be lost).

If you have a concrete example backing the suggestion that we should treat 
admin client differently, let's discuss that.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
> return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Suggestion:
> Added a new enum `NONE` in IsolationLevel, only dedicated to 
> AdminClient.listOffsets() method.
> This change may cause the highestSupportedVersion of 
> ApiMessageType.LIST_OFFSETS to increase by one.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Description: 
We pass in the {{Time}} object in the StreamThread, which is used to measure 
various metrics so that we can mock the time in tests. We should extend this to 
lower-level metrics, more specifically:

* KafkaStreams (instance-level): already done.
* StreamThread (thread-level): already done, in runtime passed through from 
instance.
* ProcessingContext (task-level).
* MeteredStore (store-level).
* StreamTask (task-level).
* ProcessorNode (processor-node-level): only a few processors that need to 
expose this level latency would need to have the {{Time}} object passed through.

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>  Labels: test
>
> We pass in the {{Time}} object in the StreamThread, which is used to measure 
> various metrics so that we can mock the time in tests. We should extend this 
> to lower-level metrics, more specifically:
> * KafkaStreams (instance-level): already done.
> * StreamThread (thread-level): already done, in runtime passed through from 
> instance.
> * ProcessingContext (task-level).
> * MeteredStore (store-level).
> * StreamTask (task-level).
> * ProcessorNode (processor-node-level): only a few processors that need to 
> expose this level latency would need to have the {{Time}} object passed 
> through.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Labels: test  (was: )

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>  Labels: test
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Description: (was: In [https://github.com/apache/kafka/pull/11896,] for 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java)

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13647.
---
Resolution: Incomplete

I resolved the ticket for now as incomplete, since the streams code cannot 
alone fix the issue, since it's on the rocksDB side to fix.

> RocksDb metrics 'number-open-files' is not correct
> --
>
> Key: KAFKA-13647
> URL: https://issues.apache.org/jira/browse/KAFKA-13647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sylvain Le Gouellec
>Priority: Major
> Attachments: image-2022-02-07-16-06-25-304.png, 
> image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png
>
>
> We were looking at RocksDB metrics and noticed that the {{number-open-files}} 
> metric behaves like a counter, rather than a gauge. 
> Looking at the code, we think there is a small error in the type of metric 
> for that specific mbean (should be a value metric rather than a sum metric).
> See [ 
> https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13846) Add an overloaded metricOrElseCreate function in Metrics

2022-04-21 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13846:
-

 Summary: Add an overloaded metricOrElseCreate function in Metrics
 Key: KAFKA-13846
 URL: https://issues.apache.org/jira/browse/KAFKA-13846
 Project: Kafka
  Issue Type: Improvement
  Components: metrics
Reporter: Guozhang Wang


The `Metrics` registry is often used by concurrent threads, however it's 
get/create APIs are not well suited for it. A common pattern from the user 
today is:

{code}
metric = metrics.metric(metricName);

if (metric == null) {
  try {
metrics.createMetric(..)
  } catch (IllegalArgumentException e){
// another thread may create the metric at the mean time
  }
} 
{code}

Otherwise the caller would need to synchronize the whole block trying to get 
the metric. However, the `createMetric` function call itself indeed synchronize 
internally on updating the metric map.

So we could consider adding a metricOrElseCreate function which is similar to 
createMetric, but instead of throwing an illegal argument exception within the 
internal synchronization block, it would just return the already existing 
metric.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-21 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13838:
--
Labels: new-consumer-threading-should-fix  (was: )

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-21 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17526071#comment-17526071
 ] 

Guozhang Wang commented on KAFKA-13838:
---

Also bringing to [~kirktrue]'s radar.

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-13588) We should consolidate `changelogFor` methods to simplify the generation of internal topic names

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-13588:
-

Assignee: Guozhang Wang  (was: Sayantanu Dey)

> We should consolidate `changelogFor` methods to simplify the generation of 
> internal topic names
> ---
>
> Key: KAFKA-13588
> URL: https://issues.apache.org/jira/browse/KAFKA-13588
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: newbie
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/pull/11611#discussion_r772625486]
> we should use `ProcessorContextUtils#changelogFor` after we remove 
> `init(final ProcessorContext context, final StateStore root)` in 
> `CahceingWindowStore#initInternal` --- this will happen in around Dec.2022, 
> which is around 3.3.0
>  
> Or any other place that we generate an internal topic name.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13588) We should consolidate `changelogFor` methods to simplify the generation of internal topic names

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525203#comment-17525203
 ] 

Guozhang Wang commented on KAFKA-13588:
---

Thanks [~dsayan] I've merged your PR.

We will keep this JIRA open until we removed the deprecated `init(final 
ProcessorContext context, final StateStore root)` in 
`CahceingWindowStore#initInternal` later.

> We should consolidate `changelogFor` methods to simplify the generation of 
> internal topic names
> ---
>
> Key: KAFKA-13588
> URL: https://issues.apache.org/jira/browse/KAFKA-13588
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Sayantanu Dey
>Priority: Minor
>  Labels: newbie
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/pull/11611#discussion_r772625486]
> we should use `ProcessorContextUtils#changelogFor` after we remove 
> `init(final ProcessorContext context, final StateStore root)` in 
> `CahceingWindowStore#initInternal` --- this will happen in around Dec.2022, 
> which is around 3.3.0
>  
> Or any other place that we generate an internal topic name.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13834:
--
Reviewer: Luke Chen  (was: Guozhang Wang)

> batch drain for nodes might have starving issue
> ---
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Assignee: ruanliang
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png|width=786,height=266!
>   
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png|width=798,height=526!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerBatch in the second and third TopicPartition queues 
> of each Node can never

[jira] [Commented] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525181#comment-17525181
 ] 

Guozhang Wang commented on KAFKA-13834:
---

[~ruanliang] I've added you to the contributor list and assigned the ticket to 
you.

> batch drain for nodes might have starving issue
> ---
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Assignee: ruanliang
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png|width=786,height=266!
>   
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png|width=798,height=526!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a 

[jira] [Assigned] (KAFKA-13834) batch drain for nodes might have starving issue

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-13834:
-

Assignee: ruanliang

> batch drain for nodes might have starving issue
> ---
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Assignee: ruanliang
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png|width=786,height=266!
>   
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png|width=798,height=526!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same as above.
> As a result, the ProducerBatch in the second and third TopicPartition queues 
> of each Node can never be traversed.

[jira] [Commented] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525174#comment-17525174
 ] 

Guozhang Wang commented on KAFKA-13838:
---

I did a quick look, and I think I agree with [~RivenSun] that the while loop in 
{{trySend}} and {{clean}} can be removed, I'm not totally sure about the first 
suggestion though since it's originally just designed as a best-effort to have 
pipelining effects. Someone more familiar with the consumer network client 
could chime in here.

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13838) Improve the poll method of ConsumerNetworkClient

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525168#comment-17525168
 ] 

Guozhang Wang commented on KAFKA-13838:
---

cc [~hachikuji] as you are working on the threading refactoring of the 
consumer, and may piggy-back the consumer network polling mechanism.

> Improve the poll method of ConsumerNetworkClient
> 
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
>
> Briefly describe the process of sending clientRequest on the Kafka Client 
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the 
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at 
> a time{*}. And the next InFlightRequest is allowed to be added only if the 
> {color:#ff}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) -> 
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a 
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments 
> on the Selector.send(send) method.
> {code:java}
> /**
>  * Queue the given request for sending in the subsequent {@link #poll(long)} 
> calls
>  * @param send The request to send
>  */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is 
> executed, more detail see Selector.write(channel) methos.
>  
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer 
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
>  client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should 
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because 
> the first request will never be *completed* here, the subsequent requests 
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the 
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, 
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be 
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise 
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13811) Investigate sliding windows performance

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13811:
--
Labels: perfomance  (was: )

> Investigate sliding windows performance
> ---
>
> Key: KAFKA-13811
> URL: https://issues.apache.org/jira/browse/KAFKA-13811
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Leah Thomas
>Priority: Major
>  Labels: perfomance
>
> We recently fixed a bug in sliding windows so that a grace period of 0ms is 
> properly calculated, see https://issues.apache.org/jira/browse/KAFKA-13739. 
> Before this patch, sliding windows with a grace period of 0ms would just skip 
> all records so nothing would get put into the store.
> When we ran benchmarks for the 3.2 release we saw a significant drop in 
> performance for sliding windows on both the 3.2 and trunk branches, see the 
> `sliding windows` results 
> [here|[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/summaries/process-cumulative-rate/graph.html].]
>  These benchmarks use a sliding window with a 0ms grace period, which means 
> until now we weren't sending any values to the state store when running these 
> benchmarks.
> I ran benchmarks on the 
> [commit|https://github.com/apache/kafka/commit/430f9c99012d1585aa544d4dadf449963296c1fd]
>  before KAFKA-13739 and changed the grace period to 2 seconds to see if the 
> bug fix changed anything. The performance was still low for [this 
> run|[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/sliding1min-3-5-3-3-0-430f9c9901-leah-20220408084241-streamsbench/].]
> Given this, it seems like the performance for sliding windows has always been 
> low but we didn't realize it because the bug fixed in KAFKA-13739 was present 
> in the benchmarks we were running. We should investigate why the current 
> algorithm is slow and see if improvements can be made



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13768) Transactional producer exits because of expiration in RecordAccumulator

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525161#comment-17525161
 ] 

Guozhang Wang commented on KAFKA-13768:
---

Hello [~ddrid], I think what you're describing here is aligned with this 
proposed KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling

Could you check and see if that's the case?

This KIP is proposed by another contributor but he's not actively working on 
this anymore, if you are interested to pick it up, please let us know.

> Transactional producer exits because of expiration in RecordAccumulator
> ---
>
> Key: KAFKA-13768
> URL: https://issues.apache.org/jira/browse/KAFKA-13768
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: xuexiaoyue
>Priority: Major
>
> Hi team, I'm using a transactional producer and set request.timeout.ms to a 
> rather small value such as 10s, meanwhile set zookeeper.session.timeout.ms 
> longer such as 30s. 
> When the producer sending records and one broker accidentally shut down, I 
> notice the producer throw out 'org.apache.kafka.common.KafkaException: The 
> client hasn't received acknowledgment for some previously sent messages and 
> can no longer retry them. It isn't safe to continue' and exit.
> Looking into the code, I found that when a batch expired in 
> RecordAccumulator, it will be marked as unsolved in Sender#sendProducerData. 
> And if it's a transactional process, it will be doomed to 
> transitionToFatalError later.
> I'm wondering why we need to transitionToFatalError here? Is it better to 
> abort this transaction instead? I know it's necessary to bump the epoch 
> during the idempotence sending, but why we let the producer crash in this 
> case?
> I found that KAFKA-8805; Bump producer epoch on recoverable errors (#7389)  
> fix this by automatically bumping the producer epoch after aborting the 
> transaction, but why it's necessary to bump the epoch, what problem will 
> occur if we call transitionToAbortableError directly and let the user abort 
> it?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13765) Describe-consumer admin should not return unstable membership information

2022-04-20 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525150#comment-17525150
 ] 

Guozhang Wang commented on KAFKA-13765:
---

Hello [~rleslie] thanks for the careful thoughts. And I agree 100% with you 
about the concerns of compatibility breaks.

I think we do not need to extend the {{DescribeConsumerGroupsOptions}} or 
{{DescribeConsumerGroupsResult}} since the existing fields are sufficient. What 
I had in mind is indeed just on the wrapper tools (like the 
kafka-consumer-groups.sh) which, by looking at the 
{{DescribeConsumerGroupsResult#state}} to decide which members to print out. Of 
course we'd still need a flag in the shell script to indicate "not print 
members when state is rebalancing" etc which would still need a KIP.

> Describe-consumer admin should not return unstable membership information
> -
>
> Key: KAFKA-13765
> URL: https://issues.apache.org/jira/browse/KAFKA-13765
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Guozhang Wang
>Priority: Major
>
> When a consumer group is in the “prepare-rebalance” phase, it's unclear if 
> all its currently registered members would still be re-joining in the new 
> generation or not, in this case, if we simply return the current members map 
> to the describe-consumer request it may be misleading as users would be 
> getting spurious results that may contain those dropping or even zombie 
> consumers.
> So I think during the prepare-rebalance phase, we should either only return 
> members who's join-group requests have already been received, OR we simply 
> return the response with no members and indicate that via prepare-rebalance 
> state the membership info is unstable and hence won't be returned.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-20 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13799.
---
Fix Version/s: 3.3.0
 Assignee: RivenSun
   Resolution: Fixed

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.3.0
>
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13834) Some problems with producers choosing batches of messages to send

2022-04-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17523804#comment-17523804
 ] 

Guozhang Wang commented on KAFKA-13834:
---

Also I'm wondering if you can resize the embedded pictures (they are very 
illustrative btw, thanks) a bit smaller for better rendering?

> Some problems with producers choosing batches of messages to send
> -
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> following process is exactly the same

[jira] [Commented] (KAFKA-13834) Some problems with producers choosing batches of messages to send

2022-04-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17523801#comment-17523801
 ] 

Guozhang Wang commented on KAFKA-13834:
---

[~shizhenzhen] Thanks for filing the JIRA ticket. Could you translate the 
description in English so that everyone in the community can understand the 
issue? Thanks!

> Some problems with producers choosing batches of messages to send
> -
>
> Key: KAFKA-13834
> URL: https://issues.apache.org/jira/browse/KAFKA-13834
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0, 2.7.1, 
> 2.6.2, 3.1.0, 2.6.3, 2.7.2, 2.8.1, 3.0.0, 3.0.1
>Reporter: shizhenzhen
>Priority: Trivial
>  Labels: producer
> Attachments: image-2022-04-18-17-36-47-393.png
>
>
> h3. 问题代码 problem code
> RecordAccumulator#drainBatchesForOneNode
> !https://img-blog.csdnimg.cn/a4e309723c364586a46df8d94e49291f.png! 在这里插入图片描述
> 问题出在这个, private int drainIndex;
> The problem is this,private int drainIndex;
> h3. 代码预期 code expectations
> 这端代码的逻辑, 是计算出发往每个Node的ProducerBatchs,是批量发送。
> 因为发送一次的请求量是有限的(max.request.size), 所以一次可能只能发几个ProducerBatch. 那么这次发送了之后, 
> 需要记录一下这里是遍历到了哪个Batch, 下次再次遍历的时候能够接着上一次遍历发送。
> 简单来说呢就是下图这样
>  
> The logic of the code at this end is to calculate the ProducerBatchs sent to 
> each Node, which is sent in batches.
> Because the amount of requests sent at one time is limited 
> (max.request.size), only a few ProducerBatch may be sent at a time. Then 
> after sending this time, you need to record which Batch is traversed here, 
> and the next time you traverse it again Can continue the last traversal send.
> Simply put, it is as follows
>  
> !image-2022-04-18-17-36-47-393.png!
>  
>  
>  
> h3. 实际情况 The actual situation
> 但是呢, 因为上面的索引drainIndex 是一个全局变量, 是RecordAccumulator共享的。
> 那么通常会有很多个Node需要进行遍历, 
> 上一个Node的索引会接着被第二个第三个Node接着使用,那么也就无法比较均衡合理的让每个TopicPartition都遍历到.
> 正常情况下其实这样也没有事情, 如果不出现极端情况的下,基本上都能遍历到。
> 怕就怕极端情况, 导致有很多TopicPartition不能够遍历到,也就会造成一部分消息一直发送不出去。
> However, because the index drainIndex above is a global variable shared by 
> RecordAccumulator.
> Then there are usually many Nodes that need to be traversed, and the index of 
> the previous Node will be used by the second and third Nodes, so it is 
> impossible to traverse each TopicPartition in a balanced and reasonable 
> manner.
> Under normal circumstances, there is nothing wrong with this. If there is no 
> extreme situation, it can basically be traversed.
> I'm afraid of extreme situations, which will result in many TopicPartitions 
> that cannot be traversed, and some messages will not be sent out all the time.
> h3. 造成的影响 impact
> 导致部分消息一直发送不出去、或者很久才能够发送出去。
> As a result, some messages cannot be sent out, or can take a long time to be 
> sent out.
> h3. 触发异常情况的一个Case /  A Case that triggers an exception
> 该Case场景如下:
>  # 生产者向3个Node发送消息
>  # 每个Node都是3个TopicPartition
>  # 每个TopicPartition队列都一直源源不断的写入消息、
>  # max.request.size 刚好只能存放一个ProdcuerBatch的大小。
> 就是这么几个条件,会造成每个Node只能收到一个TopicPartition队列里面的PrdoucerBatch消息。
> 开始的时候 drainIndex=0. 开始遍历第一个Node-0。 Node-0 准备开始遍历它下面的几个队列中的ProducerBatch,遍历一次 
> 则drainIndex+1,发现遍历了一个队列之后,就装满了这一批次的请求。
> 那么开始遍历Node-1,这个时候则drainIndex=1,首先遍历到的是 第二个TopicPartition。然后发现一个Batch之后也满了。
> 那么开始遍历Node-1,这个时候则drainIndex=2,首先遍历到的是 第三个TopicPartition。然后发现一个Batch之后也满了。
> 这一次的Node遍历结束之后把消息发送之后
> 又接着上面的请求流程,那么这个时候的drainIndex=3了。
> 遍历Node-0,这个时候取模计算得到的是第几个TopicPartition呢?那不还是第1个吗。相当于后面的流程跟上面一模一样。
> 也就导致了每个Node的第2、3个TopicPartition队列中的ProducerBatch永远遍历不到。
> 也就发送不出去了。
>  
> The case scenario is as follows:
> Producer sends message to 3 Nodes
> Each Node is 3 TopicPartitions
> Each TopicPartition queue has been continuously writing messages,
> max.request.size can only store the size of one ProdcuerBatch.
> It is these conditions that cause each Node to receive only one PrdoucerBatch 
> message in the TopicPartition queue.
> At the beginning drainIndex=0. Start traversing the first Node-0. Node-0 is 
> ready to start traversing the ProducerBatch in several queues below it. After 
> traversing once, drainIndex + 1. After traversing a queue, it is full of 
> requests for this batch.
> Then start traversing Node-1. At this time, drainIndex=1, and the second 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> Then start traversing Node-1. At this time, drainIndex=2, and the third 
> TopicPartition is traversed first. Then I found that a Batch was also full.
> After this Node traversal is over, the message is sent
> Then the above request process is followed, then drainIndex=3 at this time.
> Traversing Node-0, which TopicPartition is obtained by taking the modulo 
> calculation at this time? Isn't that the first one? Equivalent to the 
> followi

[jira] [Updated] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-04-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13773:
--
Priority: Critical  (was: Major)

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tim Alkemade
>Priority: Critical
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip, kafka-start-to-finish.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13773) Data loss after recovery from crash due to full hard disk

2022-04-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13773:
--
Component/s: log
 (was: log cleaner)

> Data loss after recovery from crash due to full hard disk
> -
>
> Key: KAFKA-13773
> URL: https://issues.apache.org/jira/browse/KAFKA-13773
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.8.0, 3.1.0, 2.8.1
>Reporter: Tim Alkemade
>Priority: Critical
> Attachments: DiskAndOffsets.png, kafka-.zip, kafka-2.7.0vs2.8.0.zip, 
> kafka-2.8.0-crash.zip, kafka-logfiles.zip, kafka-start-to-finish.zip
>
>
> While doing some testing of Kafka on Kubernetes, the data disk for kafka 
> filled up, which led to all 3 nodes crashing. I increased the disk size for 
> all three nodes and started up kafka again (one by one, waiting for the 
> previous node to become available before starting the next one). After a 
> little while two out of three nodes had no data anymore.
> According to the logs, the log cleaner kicked in and decided that the latest 
> timestamp on those partitions was '0' (i.e. 1970-01-01), and that is older 
> than the 2 week limit specified on the topic.
>  
> {code:java}
> 2022-03-28 12:17:19,740 INFO [LocalLog partition=audit-trail-0, 
> dir=/var/lib/kafka/data-0/kafka-log1] Deleting segment files 
> LogSegment(baseOffset=0, size=249689733, lastModifiedTime=1648460888636, 
> largestRecordTimestamp=Some(0)) (kafka.log.LocalLog$) [kafka-scheduler-0]
> 2022-03-28 12:17:19,753 INFO Deleted log 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.log.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted offset index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.index.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]
> 2022-03-28 12:17:19,754 INFO Deleted time index 
> /var/lib/kafka/data-0/kafka-log1/audit-trail-0/.timeindex.deleted.
>  (kafka.log.LogSegment) [kafka-scheduler-0]{code}
> Using kafka-dump-log.sh I was able to determine that the greatest timestamp 
> in that file (before deletion) was actually 1648460888636 ( 2022-03-28, 
> 09:48:08 UTC, which is today). However since this segment was the 
> 'latest/current' segment much of the file is empty. The code that determines 
> the last entry (TimeIndex.lastEntryFromIndexFile)  doesn't seem to know this 
> and just read the last position in the file, the file being mostly empty 
> causes it to read 0 for that position.
> The cleaner code seems to take this into account since 
> UnifiedLog.deleteOldSegments is never supposed to delete the current segment, 
> judging by the scaladoc, however in this case the check doesn't seem to do 
> its job. Perhaps the detected highWatermark is wrong?
> I've attached the logs and the zipped data directories (data files are over 
> 3Gb in size when unzipped)
>  
> I've encountered this problem with both kafka 2.8.1 and 3.1.0.
> I've also tried changing min.insync.replicas to 2: The issue still occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13799) Improve documentation for Kafka zero-copy

2022-04-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522601#comment-17522601
 ] 

Guozhang Wang commented on KAFKA-13799:
---

[~RivenSun] I agree with you assessment. We can update the doc accordingly.

> Improve documentation for Kafka zero-copy
> -
>
> Key: KAFKA-13799
> URL: https://issues.apache.org/jira/browse/KAFKA-13799
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: RivenSun
>Priority: Major
>
> Via documentation https://kafka.apache.org/documentation/#maximizingefficiency
> and [https://kafka.apache.org/documentation/#networklayer] ,
> We can know that Kafka combines pagecache and zero-copy when reading messages 
> in files on disk, which greatly improves the consumption rate of messages.
> But after browsing the source code:
> Look directly at the *FileRecords.writeTo(...)* method,
> 1. Only PlaintextTransportLayer.transferFrom() uses fileChannel.transferTo(), 
> and the bottom layer calls the sendfile method to implement zero-copy data 
> transfer.
> 2. The logic of the SslTransportLayer.transferFrom() method: 
> {code:java}
> fileChannel.read(fileChannelBuffer, pos) 
> -> 
> sslEngine.wrap(src, netWriteBuffer) 
> -> 
> flush(ByteBuffer buf) && socketChannel.write(buf){code}
> That is, first read the data on the disk or directly from the page cache, 
> then encrypt the data, and finally send the encrypted data to the network. 
> {*}FileChannel.transferTo() is not used in the whole process{*}.
>  
> Conclusion: 
> PlaintextTransportLayer and SslTransportLayer both use pagecache, but 
> SslTransportLayer does not implement zero-copy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-8178) KafkaProducer#send(ProducerRecord,Callback) may block for up to 60 seconds

2022-04-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522550#comment-17522550
 ] 

Guozhang Wang commented on KAFKA-8178:
--

I think it's fair to update the docs to explicitly state that producer.send() 
call can be blocked when 1) memory is full, indicating the send thread cannot 
keep up with the caller thread's append speed, 2) metadata is not available to 
determine which partition the record should go. And in practice there are 
various reasons either 1) or 2) could be triggered, for example: a) producer 
client is throttled at the broker, b) broker cluster is un-healthy and hence 
cannot respond with either metadata or produce requests, c) more...

> KafkaProducer#send(ProducerRecord,Callback) may block for up to 60 seconds
> --
>
> Key: KAFKA-8178
> URL: https://issues.apache.org/jira/browse/KAFKA-8178
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Reporter: Sergei Egorov
>Priority: Major
>
> Hello. I was running reactor-kafka with [the BlockHound 
> agent|https://github.com/reactor/BlockHound] (you can see the progress 
> [here|https://github.com/reactor/reactor-kafka/pull/75] and even run it 
> yourself) and it detected a very dangerous blocking call in 
> KafkaProducer#send(ProducerRecord,Callback) which is supposed to be async:
> {code:java}
> java.lang.Error: Blocking call! java.lang.Object#wait
>   at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
>   at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
>   at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
>   at java.lang.Object.wait(Object.java)
>   at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
> {code}
> it blocks for up to "maxBlockTimeMs" (60 seconds by default)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10405:
--
Fix Version/s: 3.3.0
   (was: 3.2.0)

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.3.0
>
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-10405) Flaky Test org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState

2022-03-25 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10405:
--
Fix Version/s: 3.2.0

> Flaky Test 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState
> ---
>
> Key: KAFKA-10405
> URL: https://issues.apache.org/jira/browse/KAFKA-10405
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0
>
>
> From build [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1979/]
>  
> {noformat}
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > 
> shouldRestoreState FAILED
> 14:25:19 java.lang.AssertionError: Condition not met within timeout 
> 6. Repartition topic 
> restore-test-KSTREAM-AGGREGATE-STATE-STORE-02-repartition not purged 
> data after 6 ms.
> 14:25:19 at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
> 14:25:19 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:388)
> 14:25:19 at 
> org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState(PurgeRepartitionTopicIntegrationTest.java:206){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13771) Support to explicitly delete delegationTokens that have expired but have not been automatically cleaned up

2022-03-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17512444#comment-17512444
 ] 

Guozhang Wang commented on KAFKA-13771:
---

cc [~rsivaram] could you take a look at this ticket when you are free?

> Support to explicitly delete delegationTokens that have expired but have not 
> been automatically cleaned up
> --
>
> Key: KAFKA-13771
> URL: https://issues.apache.org/jira/browse/KAFKA-13771
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: RivenSun
>Priority: Major
>
> Quoting the official documentation
> {quote}
> Tokens can also be cancelled explicitly. If a token is not renewed by the 
> token’s expiration time or if token is beyond the max life time, it will be 
> deleted from all broker caches as well as from zookeeper.
> {quote}
> 1. The first point above means that after the `AdminClient` initiates the 
> EXPIRE_DELEGATION_TOKEN request, in the DelegationTokenManager.expireToken() 
> method on the KafkaServer side, if the user passes in expireLifeTimeMs less 
> than 0, KafaServer will delete the corresponding delegationToken directly.
> 2. There is a thread named "delete-expired-tokens" on the KafkaServer side, 
> which is responsible for regularly cleaning up expired tokens. The execution 
> interval is `delegation.token.expiry.check.interval.ms`, and the default 
> value is one hour.
> But carefully analyze the code logic in DelegationTokenManager.expireToken(), 
> *now Kafka does not support users to delete an expired delegationToken that 
> he no longer uses/renew. If the user wants to do this, they will receive a 
> DelegationTokenExpiredException.*
> In the worst case, an expired delegationToken may still can be used normally 
> within {*}an hour{*}, even if this configuration 
> (delegation.token.expiry.check.interval.ms) broker can shorten the 
> configuration as much as possible.
> The solution is very simple, simply adjust the `if` order of 
> DelegationTokenManager.expireToken().
> {code:java}
> if (!allowedToRenew(principal, tokenInfo)) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_OWNER_MISMATCH, -1)
> } else if (expireLifeTimeMs < 0) { //expire immediately
>   removeToken(tokenInfo.tokenId)
>   info(s"Token expired for token: ${tokenInfo.tokenId} for owner: 
> ${tokenInfo.owner}")
>   expireResponseCallback(Errors.NONE, now)
> } else if (tokenInfo.maxTimestamp < now || tokenInfo.expiryTimestamp < now) {
>   expireResponseCallback(Errors.DELEGATION_TOKEN_EXPIRED, -1)
> } else {
>   //set expiry time stamp
>  ..
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time

2022-03-24 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13692.
---
Fix Version/s: 3.3.0
   (was: 3.2.0)
   Resolution: Fixed

> stream thread blocked-time-ns-total metric does not include producer metadata 
> wait time
> ---
>
> Key: KAFKA-13692
> URL: https://issues.apache.org/jira/browse/KAFKA-13692
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
> Fix For: 3.3.0
>
>
> The stream thread blocked-time-ns-total metric does not include producer 
> metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can 
> contribute significantly to actual total blocked time in some cases. For 
> example, if a user deletes the streams sink topic, producers will wait until 
> the max block timeout. This time does not get included in total blocked time 
> when it should.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13692) stream thread blocked-time-ns-total metric does not include producer metadata wait time

2022-03-24 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-13692:
-

Assignee: Rohan Desai

> stream thread blocked-time-ns-total metric does not include producer metadata 
> wait time
> ---
>
> Key: KAFKA-13692
> URL: https://issues.apache.org/jira/browse/KAFKA-13692
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
> Fix For: 3.2.0
>
>
> The stream thread blocked-time-ns-total metric does not include producer 
> metadata wait time (time spent in `KafkaProducer.waitOnMetadata`). This can 
> contribute significantly to actual total blocked time in some cases. For 
> example, if a user deletes the streams sink topic, producers will wait until 
> the max block timeout. This time does not get included in total blocked time 
> when it should.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-24 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13689:
--
Fix Version/s: 3.3.0
   (was: 3.2.0)

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.3.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-03-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511534#comment-17511534
 ] 

Guozhang Wang commented on KAFKA-13766:
---

cc [~dajac]

> Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
> -
>
> Key: KAFKA-13766
> URL: https://issues.apache.org/jira/browse/KAFKA-13766
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> The lifetime of a consumer can be categorized in three phases:
> 1) During normal processing, the broker expects a hb request periodically 
> from consumer, and that is timed by the `session.timeout.ms`.
> 2) During the prepare_rebalance, the broker would expect a join-group request 
> to be received within the rebalance.timeout, which is piggy-backed as the 
> `max.poll.interval.ms`.
> 3) During the complete_rebalance, the broker would expect a sync-group 
> request to be received again within the `session.timeout.ms`.
> So during different phases of the life of the consumer, different timeout 
> would be used to bound the timer.
> Nowadays with cooperative rebalance protocol, we can still return records and 
> process them in the middle of a rebalance from {{consumer.poll}}. In that 
> case, for phase 3) we should also use the `max.poll.interval.ms` to bound the 
> timer, which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13766:
-

 Summary: Use `max.poll.interval.ms` as the timeout during 
complete-rebalance phase
 Key: KAFKA-13766
 URL: https://issues.apache.org/jira/browse/KAFKA-13766
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Guozhang Wang


The lifetime of a consumer can be categorized in three phases:

1) During normal processing, the broker expects a hb request periodically from 
consumer, and that is timed by the `session.timeout.ms`.

2) During the prepare_rebalance, the broker would expect a join-group request 
to be received within the rebalance.timeout, which is piggy-backed as the 
`max.poll.interval.ms`.

3) During the complete_rebalance, the broker would expect a sync-group request 
to be received again within the `session.timeout.ms`.

So during different phases of the life of the consumer, different timeout would 
be used to bound the timer.

Nowadays with cooperative rebalance protocol, we can still return records and 
process them in the middle of a rebalance from {{consumer.poll}}. In that case, 
for phase 3) we should also use the `max.poll.interval.ms` to bound the timer, 
which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13765) Describe-consumer admin should not return unstable membership information

2022-03-23 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13765:
-

 Summary: Describe-consumer admin should not return unstable 
membership information
 Key: KAFKA-13765
 URL: https://issues.apache.org/jira/browse/KAFKA-13765
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: Guozhang Wang


When a consumer group is in the “prepare-rebalance” phase, it's unclear if all 
its currently registered members would still be re-joining in the new 
generation or not, in this case, if we simply return the current members map to 
the describe-consumer request it may be misleading as users would be getting 
spurious results that may contain those dropping or even zombie consumers.

So I think during the prepare-rebalance phase, we should either only return 
members who's join-group requests have already been received, OR we simply 
return the response with no members and indicate that via prepare-rebalance 
state the membership info is unstable and hence won't be returned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17511522#comment-17511522
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Hi [~RivenSun] I think I agree with you on the general note, my concern is that 
since we do not control when `logUnused` is called, at the time when not all 
provided values are retrieved, then we would log `config ... is not used.` 
which would then be a bit misleading since they are likely going to be used 
later indeed. But after a second thought, the semantics is fine we we just say 
`config  ... is not yet used.` at the time when `logUnused` is called, so it's 
really the responsibility of the caller regarding when they want to call this 
function to check which configs are not yet used.

So I think we can just like you said log it as (just adding `yet` at the end of 
the sentence).

{code}
public void logUnused() {
Set unusedkeys = unused();
if (!unusedkeys.isEmpty()) {
log.warn("These configurations '{}' were supplied but are not used 
yet.", unusedkeys);
}
} 
{code}

Since we always call `logUnused` at the end of the constructor of 
producer/consumer/admin, then it's very likely that those unknown configs are 
not retrieved yet and hence would be logged. For that, I'd say we update our 
web docs indicating this effect exactly to clear any possible confusions.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void log

[jira] [Comment Edited] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510767#comment-17510767
 ] 

Guozhang Wang edited comment on KAFKA-13689 at 3/22/22, 5:14 PM:
-

Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

{code}
StreamsConfig config = new StreamsConfig(map);
{code}

If we call

{code}
config.get(PREDEFINED_CONFIG_NAME)
{code}

then the param must be a defined config name, otherwise it will throw;

If we call

{code}
config.originals().get(ANY_CONFIG_NAME)
{code}

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the `logUnused` could still contain case 1) or case 3) whereas we really only 
want to WARN on case 1) above. The latter issue is a bit out of the scope of 
this JIRA, hence I said just doing the above two is sufficient for this issue.



If we want to resolve the second as well in this JIRA, I'm thinking we can do 
sth. aligned with your proposal in the description as "AbstractConfig provides 
two new methods: logUnknown() and unknown()" but to avoid creating a KIP for 
adding public APIs, we can just do them inside the existing `logUnused`, as 
something like:

{code}
public Set unused() {
Set keys = new HashSet<>(values.keySet());   // here we take 
the diff between values and used.
keys.removeAll(used);
return keys;
}

// keep it private
private Set unknown() {
Set keys = new HashSet<>(originals.keySet());   // here we take 
the diff between originals and values.
keys.removeAll(values);
return keys;
}

public void logUnused() {
Set unusedkeys = unused(); 
for (String key : unused())
log.warn("The configuration '{}' was supplied but isn't a used.", key); 
// here we still log one line per config as a WARN

Set unusedkeys = unknown(); 
if (!unknown.isEmpty()) {
log.info("These configurations '{}' were not known.", unusedkeys);  // 
here we log one line for all configs as INFO.
}
} 
{code}



was (Author: guozhang):
Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

```
StreamsConfig config = new StreamsConfig(map);
```

If we call

```
config.get(PREDEFINED_CONFIG_NAME)
```

then the param must be a defined config name, otherwise it will throw;

If we call

```
config.originals().get(ANY_CONFIG_NAME)
```

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), bu

[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510767#comment-17510767
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Okay I think we are really on the same page here, just are using different 
terminologies :) Where I'm from is the Streams usage, e.g. let's say we create 
a StreamsConfig object from:

```
StreamsConfig config = new StreamsConfig(map);
```

If we call

```
config.get(PREDEFINED_CONFIG_NAME)
```

then the param must be a defined config name, otherwise it will throw;

If we call

```
config.originals().get(ANY_CONFIG_NAME)
```

then it tries to get from the underlying maps directly, and any config names 
including the custom unknown configs can be retrieved. So that's how custom 
configs are retrieved (like you said, some modules use `(config.originals() 
UNION config.values()) .get(ANY_CONFIG_NAME).` as well).

Now, I realize since `logUnused` is triggered at the construction of the 
consumer/producer clients, whereas those embedded module's custom configs are 
possibly not yet retrieved as they will only be constructed and initialized at 
a later time, in which case `used` set would not yet contain any of those 
unknown configs yet. As a result:

1) all defined, used configs should be included in `used` since they are 
retrieved via the first call above. This is the case we want to guarantee and 
WARN if not since it may indicates a bug.
2) all defined, but disabled configs are included in `used` since they are 
called via `config.ignore()`. This is what we want to fix in this JIRA.
3) all unknown configs may or may not be included in `used` and that's out of 
the AbstractConfig object's control.

So by doing the above two, we can fix this JIRA ticket which is case 2), but 
the `logUnused` could still contain case 1) or case 3) whereas we really only 
want to WARN on case 1) above. The latter issue is a bit out of the scope of 
this JIRA, hence I said just doing the above two is sufficient for this issue.



If we want to resolve the second as well in this JIRA, I'm thinking we can do 
sth. aligned with your proposal in the description as "AbstractConfig provides 
two new methods: logUnknown() and unknown()" but to avoid creating a KIP for 
adding public APIs, we can just do them inside the existing `logUnused`, as 
something like:

```
public Set unused() {
Set keys = new HashSet<>(values.keySet());   // here we take 
the diff between values and used.
keys.removeAll(used);
return keys;
}

// keep it private
private Set unknown() {
Set keys = new HashSet<>(originals.keySet());   // here we take 
the diff between originals and values.
keys.removeAll(values);
return keys;
}

public void logUnused() {
Set unusedkeys = unused(); 
for (String key : unused())
log.warn("The configuration '{}' was supplied but isn't a used.", key); 
// here we still log one line per config as a WARN

Set unusedkeys = unknown(); 
if (!unknown.isEmpty()) {
log.info("These configurations '{}' were not known.", unusedkeys);  // 
here we log one line for all configs as INFO.
}
} 
```

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer 

[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-21 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510209#comment-17510209
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Hello [~RivenSun] I agree with you except some pondering about this statement:

4. `used` will also contain unknownConfigKeys after users retrieve their own 
custom  configuration, so `used` will not necessarily be a subset of `values`.

Since `used` will only be updated with either `get` or `ignore`, so assume we 
do not call `ignore` at the moment, `get` calls could only be used to retrieve 
those known configs, and unknown configs can only be retrieved from the 
original map itself. So `used` should not contain any unknown configs at all.

With that said, I think we are still in agreement on the two improvement points 
you mentioned in the last message since the `unused()` returned should still 
just have those not-used configs, just that I think those unknown ones would 
not be included.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13728) PushHttpMetricsReporter no longer pushes metrics when network failure is recovered.

2022-03-21 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13728.
---
Fix Version/s: 3.2.0
   Resolution: Fixed

> PushHttpMetricsReporter no longer pushes metrics when network failure is 
> recovered.
> ---
>
> Key: KAFKA-13728
> URL: https://issues.apache.org/jira/browse/KAFKA-13728
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.1.0
>Reporter: XiaoyiPeng
>Priority: Minor
> Fix For: 3.2.0
>
>
> The class *PushHttpMetricsReporter* no longer pushes metrics when network 
> failure is recovered.
> I debugged the code and found the problem here :
> [https://github.com/apache/kafka/blob/dc36dedd28ff384218b669de13993646483db966/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java#L214-L221]
>  
> When we submit a task to the *ScheduledThreadPoolExecutor* that needs to be 
> executed periodically, if the task throws an exception and is not swallowed, 
> the task will no longer be scheduled to execute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509372#comment-17509372
 ] 

Guozhang Wang commented on KAFKA-13689:
---

> So for these unKnown configs, they will never appear in the logUnused() 
> output because they are not in the `originals` set. But unKnown configs may 
> be stored in `used` set, right?

The `values` map is constructed as `this.values = 
definition.parse(this.originals);` so it should just be a subset of `originals` 
that are defined by the config itself. Those custom, hence unknown configs 
would not be in the `values` but would be in `originals`, and pluggable 
components would use `originals().get()` to retrieve those custom config values.

Since `unused` is calculated as the diff between `originals` and `used` (and 
`used` would only be a subset of `values` if we exclude those ignored ones), so 
they could appear in the logUnused.

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the configurations related to the KafkaProducer transaction will not be 
> requested.
> See the source code: KafkaProducer#configureTransactionState(...) .
> 2) AbstractConfig#logUnused() -> AbstractConfig#unused()
> {code:java}
> public Set unused() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(used);
> return keys;
> } {code}
> If a configuration has not been requested, the configuration will not be put 
> into the used variable. SourceCode see as below:
> AbstractConfig#get(String key)
>  
> {code:java}
> protected Object get(String key) {
> if (!values.containsKey(key))
> throw new ConfigException(String.format("Unknown configuration '%s'", 
> key));
> used.add(key);
> return values.get(key);
> } {code}
> h1.  
> h1. Solution:
> 1. AbstractConfig#logUnused() method
> Modify the log printing information of this method,and the unused 
> configuration log print level can be changed to {*}INFO{*}, what do you think?
> {code:java}
> /**
>  * Log infos for any unused configurations
>  */
> public void logUnused() {     for (String key : unused())
> log.info("The configuration '{}' was supplied but isn't a used 
> config.", key);
> }{code}
>  
>  
> 2. AbstractConfig provides two new methods: logUnknown() and unknown()
> {code:java}
> /**
>  * Log warnings for any unknown configurations
>  */
> public void logUnknown() {
> for (String key : unknown())
> log.warn("The configuration '{}' was supplied but isn't a known 
> config.", key);
> } {code}
>  
> {code:java}
> public Set unknown() {
> Set keys = new HashSet<>(originals.keySet());
> keys.removeAll(values.keySet());
> return keys;
> } {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13689) AbstractConfig log print information is incorrect

2022-03-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509357#comment-17509357
 ] 

Guozhang Wang commented on KAFKA-13689:
---

Hi, I think we can first revisit the meaningfulness of the "logUnused" function 
itself: in "AbstractConfig" we only record "used" configs from those parsed 
definitions, and the `unused` set is the diff of the "originals" (note, not the 
"values") and the "used". This means:

1) Any pluggable component's configs, since they are not defined in the 
corresponding Producer/Consumer/etcConfig, they would always be in the "unused" 
set. The pluggable components thsmeslves would not try to use the "get" call to 
retrieve them anyways, but would usually us `originals().get()`. I.e. they are 
really just "unknown".
2) Any configs that are not called via "get" or "ignore" would be in this 
"unused" set. I.e. they are "unused".

The intention of "logUnused" function itself, is to warn people just about the 
second case (as this JIRA is trying to resolve), since assuming that the code 
is bug-free, this should NOT happen --- thinking about this, when certain 
component is not enabled, we should call `config.ignore(...)` on the 
corresponding configs, like `TRANSACTION_TIMEOUT_CONFIG` in this case. With 
that, we should not have "unused" configs at all. On the other hand, have a log 
line for each "unknown" config may be a bit too verbose, e.g. in Streams we 
have a bunch of pluggable component resulting in quite a lot of log lines for 
each of their custom config. I think this is not the original intention of the 
"logUnused" function to actually warn about the "unknown" configs at all, and 
it's less valuable to do so as well.

There's an additional scenario though, for deprecated configs: when we 
deprecate a config, the source code may not request them anymore but if the 
user does not change their code, they may still set values for those deprecated 
configs which would then end up in the `unused` set. So we should also call 
`config.ignore()` on those deprecated configs if they are no longer used inside 
the source code (note that to achieve backward compatibility, some deprecated 
configs may still be retrieved inside the code, in that case we would not need 
to call `ignore()`).

So if you agree with me that "logUnused" should really be a WARN for the second 
case above since this should never happen, I'd suggest we:

1) use "config.ignore" in cases when certain component are disabled and hence 
their corresponding configs would not be requested, or when the config is 
deprecated and no longer requested.
2) change the log line to "was supplied but isn't used" (don't we feel it's 
weird to log "it's not known" in a function called "logUnused"? :P).

> AbstractConfig log print information is incorrect
> -
>
> Key: KAFKA-13689
> URL: https://issues.apache.org/jira/browse/KAFKA-13689
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Fix For: 3.2.0
>
>
> h1. 1.Example
> KafkaClient version is 3.1.0, KafkaProducer init properties:
>  
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60003);{code}
>  
>  
> Partial log of KafkaProducer initialization:
> {code:java}
>     ssl.truststore.location = C:\Personal 
> File\documents\KafkaSSL\client.truststore.jks
>     ssl.truststore.password = [hidden]
>     ssl.truststore.type = JKS
>     transaction.timeout.ms = 60003
>     transactional.id = null
>     value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer[main] INFO 
> org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully 
> logged in.
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration 'transaction.timeout.ms' was supplied but isn't a known config.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.1.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
> 37edeed0777bacb3
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
> 1645602332999 {code}
> From the above log, you can see that KafkaProducer has applied the user's 
> configuration, {*}transaction.timeout.ms=60003{*}, the default value of this 
> configuration is 6.
> But we can see another line of log:
> [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The 
> configuration *'transaction.timeout.ms'* was supplied but isn't a 
> *{color:#ff}known{color}* config.
>  
> h1. 2.RootCause:
> 1) ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG is set to {*}false{*}.
> So the c

[jira] [Created] (KAFKA-13746) Flaky kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed

2022-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13746:
-

 Summary: Flaky 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed
 Key: KAFKA-13746
 URL: https://issues.apache.org/jira/browse/KAFKA-13746
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 3.2.0


Example: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/

{code}
java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1
at 
kafka.admin.TopicCommandIntegrationTest.testDescribeUnderMinIsrPartitionsMixed(TopicCommandIntegrationTest.scala:686)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13745) Flaky kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone

2022-03-15 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13745:
-

 Summary: Flaky 
kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone
 Key: KAFKA-13745
 URL: https://issues.apache.org/jira/browse/KAFKA-13745
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 3.2.0


Example: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11796/7/tests/

{code}
org.opentest4j.AssertionFailedError: expected:  but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:40)
at org.junit.jupiter.api.AssertFalse.assertFalse(AssertFalse.java:35)
at org.junit.jupiter.api.Assertions.assertFalse(Assertions.java:227)
at 
kafka.network.SocketServerTest.testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone(SocketServerTest.scala:751)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-6089) Transient failure in kafka.network.SocketServerTest.configureNewConnectionException

2022-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6089:
-
Fix Version/s: 3.2.0

> Transient failure in 
> kafka.network.SocketServerTest.configureNewConnectionException
> ---
>
> Key: KAFKA-6089
> URL: https://issues.apache.org/jira/browse/KAFKA-6089
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.2.0
>
>
> Stack trace:
> {quote}
> java.lang.AssertionError: Channels not removed
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:347)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:861)
>   at 
> kafka.network.SocketServerTest.kafka$network$SocketServerTest$$assertProcessorHealthy(SocketServerTest.scala:888)
>   at 
> kafka.network.SocketServerTest$$anonfun$configureNewConnectionException$1.apply(SocketServerTest.scala:654)
>   at 
> kafka.network.SocketServerTest$$anonfun$configureNewConnectionException$1.apply(SocketServerTest.scala:645)
>   at 
> kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:863)
>   at 
> kafka.network.SocketServerTest.configureNewConnectionException(SocketServerTest.scala:645)
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13584) Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` flaky test

2022-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13584:
--
Priority: Blocker  (was: Major)

> Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` 
> flaky test
> -
>
> Key: KAFKA-13584
> URL: https://issues.apache.org/jira/browse/KAFKA-13584
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Blocker
>
> {noformat}
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1515)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1515)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1486)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive(SocketServerTest.scala:1407)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13584) Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` flaky test

2022-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13584:
--
Fix Version/s: 3.2.0

> Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` 
> flaky test
> -
>
> Key: KAFKA-13584
> URL: https://issues.apache.org/jira/browse/KAFKA-13584
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Blocker
> Fix For: 3.2.0
>
>
> {noformat}
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1515)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1515)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1486)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive(SocketServerTest.scala:1407)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13584) Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` flaky test

2022-03-15 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13584:
--
Issue Type: Bug  (was: Improvement)

> Fix `kafka.network.SocketServerTest.testUnmuteChannelWithBufferedReceives` 
> flaky test
> -
>
> Key: KAFKA-13584
> URL: https://issues.apache.org/jira/browse/KAFKA-13584
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Priority: Major
>
> {noformat}
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1515)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1515)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1486)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithIncompleteBufferedReceive(SocketServerTest.scala:1407)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8785) Flakey test LeaderElectionCommandTest#testPathToJsonFile

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8785:
-
Fix Version/s: 3.2.0

> Flakey test LeaderElectionCommandTest#testPathToJsonFile
> 
>
> Key: KAFKA-8785
> URL: https://issues.apache.org/jira/browse/KAFKA-8785
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.2.0
>
>
> *https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/895/console*
> *2:35* kafka.admin.LeaderElectionCommandTest > testPathToJsonFile 
> STARTED*13:23:16* kafka.admin.LeaderElectionCommandTest.testPathToJsonFile 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testPathToJsonFile.test.stdout*13:23:16*
>  *13:23:16* kafka.admin.LeaderElectionCommandTest > testPathToJsonFile 
> FAILED*13:23:16* kafka.common.AdminCommandFailedException: Timeout 
> waiting for election results*13:23:16* at 
> kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testPathToJsonFile$1(LeaderElectionCommandTest.scala:160)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testPathToJsonFile$1$adapted(LeaderElectionCommandTest.scala:137)*13:23:16*
>  at kafka.utils.TestUtils$.resource(TestUtils.scala:1591)*13:23:16*   
>   at 
> kafka.admin.LeaderElectionCommandTest.testPathToJsonFile(LeaderElectionCommandTest.scala:137)*13:23:16*
>  *13:23:16* Caused by:*13:23:16* 
> org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout.*13:23:16*



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13737:
--
Priority: Blocker  (was: Major)

> Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
> 
>
> Key: KAFKA-13737
> URL: https://issues.apache.org/jira/browse/KAFKA-13737
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeTopics
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
>   at scala.util.Try$.apply(Try.scala:210)
>   at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
>   at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
>   at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
>   at 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13736:
--
Priority: Blocker  (was: Major)

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13735:
--
Fix Version/s: 3.2.0

> Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
> ---
>
> Key: KAFKA-13735
> URL: https://issues.apache.org/jira/browse/KAFKA-13735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests
> {code}
> Stacktrace
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
>   at scala.Option.getOrElse(Option.scala:201)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13737:
--
Fix Version/s: 3.2.0

> Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
> 
>
> Key: KAFKA-13737
> URL: https://issues.apache.org/jira/browse/KAFKA-13737
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeTopics
>   at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>   at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
>   at 
> kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
>   at scala.util.Try$.apply(Try.scala:210)
>   at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
>   at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
>   at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
>   at 
> kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8785) Flakey test LeaderElectionCommandTest#testPathToJsonFile

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8785:
-
Priority: Blocker  (was: Major)

> Flakey test LeaderElectionCommandTest#testPathToJsonFile
> 
>
> Key: KAFKA-8785
> URL: https://issues.apache.org/jira/browse/KAFKA-8785
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Luke Chen
>Priority: Blocker
>
> *https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/895/console*
> *2:35* kafka.admin.LeaderElectionCommandTest > testPathToJsonFile 
> STARTED*13:23:16* kafka.admin.LeaderElectionCommandTest.testPathToJsonFile 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testPathToJsonFile.test.stdout*13:23:16*
>  *13:23:16* kafka.admin.LeaderElectionCommandTest > testPathToJsonFile 
> FAILED*13:23:16* kafka.common.AdminCommandFailedException: Timeout 
> waiting for election results*13:23:16* at 
> kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testPathToJsonFile$1(LeaderElectionCommandTest.scala:160)*13:23:16*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testPathToJsonFile$1$adapted(LeaderElectionCommandTest.scala:137)*13:23:16*
>  at kafka.utils.TestUtils$.resource(TestUtils.scala:1591)*13:23:16*   
>   at 
> kafka.admin.LeaderElectionCommandTest.testPathToJsonFile(LeaderElectionCommandTest.scala:137)*13:23:16*
>  *13:23:16* Caused by:*13:23:16* 
> org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout.*13:23:16*



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13736:
--
Fix Version/s: 3.2.0

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13735:
--
Priority: Blocker  (was: Major)

> Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
> ---
>
> Key: KAFKA-13735
> URL: https://issues.apache.org/jira/browse/KAFKA-13735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests
> {code}
> Stacktrace
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
>   at scala.Option.getOrElse(Option.scala:201)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7957:
-
Priority: Blocker  (was: Critical)

> Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
> -
>
> Key: KAFKA-7957
> URL: https://issues.apache.org/jira/browse/KAFKA-7957
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/]
> {quote}java.lang.AssertionError: Messages not sent at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:356) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at 
> kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270)
>  at 
> kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7957:
-
Fix Version/s: 3.2.0

> Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
> -
>
> Key: KAFKA-7957
> URL: https://issues.apache.org/jira/browse/KAFKA-7957
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/]
> {quote}java.lang.AssertionError: Messages not sent at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:356) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at 
> kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270)
>  at 
> kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8541) Flakey test LeaderElectionCommandTest#testTopicPartition

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8541:
-
Fix Version/s: 3.2.0

> Flakey test LeaderElectionCommandTest#testTopicPartition
> 
>
> Key: KAFKA-8541
> URL: https://issues.apache.org/jira/browse/KAFKA-8541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Boyang Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.2.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5528/consoleFull]
>  
> *01:37:34* kafka.admin.LeaderElectionCommandTest > testTopicPartition 
> STARTED*01:38:13* kafka.admin.LeaderElectionCommandTest.testTopicPartition 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testTopicPartition.test.stdout*01:38:13*
>  *01:38:13* kafka.admin.LeaderElectionCommandTest > testTopicPartition 
> FAILED*01:38:13* kafka.common.AdminCommandFailedException: Timeout 
> waiting for election results*01:38:13* at 
> kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:134)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:89)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:42)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testTopicPartition$1(LeaderElectionCommandTest.scala:125)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testTopicPartition$1$adapted(LeaderElectionCommandTest.scala:103)*01:38:13*
>  at kafka.utils.TestUtils$.resource(TestUtils.scala:1528)*01:38:13*   
>   at 
> kafka.admin.LeaderElectionCommandTest.testTopicPartition(LeaderElectionCommandTest.scala:103)*01:38:13*
>  *01:38:13* Caused by:*01:38:13* 
> org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout.*01:38:13*



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8541) Flakey test LeaderElectionCommandTest#testTopicPartition

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8541:
-
Priority: Blocker  (was: Major)

> Flakey test LeaderElectionCommandTest#testTopicPartition
> 
>
> Key: KAFKA-8541
> URL: https://issues.apache.org/jira/browse/KAFKA-8541
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Boyang Chen
>Assignee: Luke Chen
>Priority: Blocker
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5528/consoleFull]
>  
> *01:37:34* kafka.admin.LeaderElectionCommandTest > testTopicPartition 
> STARTED*01:38:13* kafka.admin.LeaderElectionCommandTest.testTopicPartition 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testTopicPartition.test.stdout*01:38:13*
>  *01:38:13* kafka.admin.LeaderElectionCommandTest > testTopicPartition 
> FAILED*01:38:13* kafka.common.AdminCommandFailedException: Timeout 
> waiting for election results*01:38:13* at 
> kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:134)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:89)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:42)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testTopicPartition$1(LeaderElectionCommandTest.scala:125)*01:38:13*
>  at 
> kafka.admin.LeaderElectionCommandTest.$anonfun$testTopicPartition$1$adapted(LeaderElectionCommandTest.scala:103)*01:38:13*
>  at kafka.utils.TestUtils$.resource(TestUtils.scala:1528)*01:38:13*   
>   at 
> kafka.admin.LeaderElectionCommandTest.testTopicPartition(LeaderElectionCommandTest.scala:103)*01:38:13*
>  *01:38:13* Caused by:*01:38:13* 
> org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout.*01:38:13*



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13672) Flaky test kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13672:
--
Fix Version/s: 3.2.0

> Flaky test 
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
> ---
>
> Key: KAFKA-13672
> URL: https://issues.apache.org/jira/browse/KAFKA-13672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bruno Cadonna
>Assignee: Liam Clarke-Hutchinson
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Stacktrace:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at 
> app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
>   at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfigOnServer$1(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfigOnServer(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1$adapted(DynamicBrokerReconfigurationTest.scala:1495)
>   at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>   at 
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>   at app//scala.collection.AbstractIterable.foreach(Iterable.scala:926)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfig(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.reconfigureServers(DynamicBrokerReconfigurationTest.scala:1440)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:775)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:768)
>   at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:784)
> {code}
> Job: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11751/5/testReport/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13672) Flaky test kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13672:
--
Priority: Blocker  (was: Major)

> Flaky test 
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
> ---
>
> Key: KAFKA-13672
> URL: https://issues.apache.org/jira/browse/KAFKA-13672
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bruno Cadonna
>Assignee: Liam Clarke-Hutchinson
>Priority: Blocker
>
> Stacktrace:
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at 
> app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
>   at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
>   at 
> app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfigOnServer$1(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfigOnServer(DynamicBrokerReconfigurationTest.scala:1500)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1$adapted(DynamicBrokerReconfigurationTest.scala:1495)
>   at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>   at 
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>   at app//scala.collection.AbstractIterable.foreach(Iterable.scala:926)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfig(DynamicBrokerReconfigurationTest.scala:1495)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.reconfigureServers(DynamicBrokerReconfigurationTest.scala:1440)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:775)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:768)
>   at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>   at 
> app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:784)
> {code}
> Job: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11751/5/testReport/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8280:
-
Priority: Blocker  (was: Major)

> Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
> ---
>
> Key: KAFKA-8280
> URL: https://issues.apache.org/jira/browse/KAFKA-8280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: John Roesler
>Priority: Blocker
>
> I saw this fail again on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
> {noformat}
> Error Message
> java.lang.AssertionError: Unclean leader not elected
> Stacktrace
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
> {noformat}
> {noformat}
> Standard Output
> Completed Updating config for entity: brokers '0'.
> Completed Updating config for entity: brokers '1'.
> Completed Updating config for entity: brokers '2'.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-7 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-9 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic

[jira] [Updated] (KAFKA-8280) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8280:
-
Fix Version/s: 3.2.0

> Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
> ---
>
> Key: KAFKA-8280
> URL: https://issues.apache.org/jira/browse/KAFKA-8280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.2.0
>
>
> I saw this fail again on 
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3979/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
> {noformat}
> Error Message
> java.lang.AssertionError: Unclean leader not elected
> Stacktrace
> java.lang.AssertionError: Unclean leader not elected
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:510)
> {noformat}
> {noformat}
> Standard Output
> Completed Updating config for entity: brokers '0'.
> Completed Updating config for entity: brokers '1'.
> Completed Updating config for entity: brokers '2'.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,690] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-7 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,722] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition testtopic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,761] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=1] Error for partition testtopic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-3 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:11,765] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition testtopic-9 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,779] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-38 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-23 01:17:13,780] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=1] Error for partition __consumer_offsets-14 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does

[jira] [Updated] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6527:
-
Priority: Blocker  (was: Major)

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13421:
--
Fix Version/s: 3.2.0

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Blocker
> Fix For: 3.2.0
>
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-6527) Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6527:
-
Fix Version/s: 3.2.0

> Transient failure in DynamicBrokerReconfigurationTest.testDefaultTopicConfig
> 
>
> Key: KAFKA-6527
> URL: https://issues.apache.org/jira/browse/KAFKA-6527
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.2.0
>
>
> {code:java}
> java.lang.AssertionError: Log segment size increase not applied
>   at kafka.utils.TestUtils$.fail(TestUtils.scala:355)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testDefaultTopicConfig(DynamicBrokerReconfigurationTest.scala:348)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7540:
-
Fix Version/s: 3.2.0

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHub

[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13421:
--
Priority: Blocker  (was: Major)

> Fix 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> -
>
> Key: KAFKA-13421
> URL: https://issues.apache.org/jira/browse/KAFKA-13421
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Priority: Blocker
>
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
>  is failing with this error:
> {code}
> ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup()
>  failed, log available in 
> /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout
>   
>   
> ConsumerBounceTest > 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() 
> FAILED
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode 
> = NodeExists
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126) 
>
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
>  
> at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
> at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:320)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2
> 12)
> at 
> scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
> at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
> at 
> kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203)
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB
> igGroup$1(ConsumerBounceTest.scala:327)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
> at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C
> onsumerBounceTest.scala:319) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7540:
-
Priority: Blocker  (was: Critical)

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConne

[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12495:
--
Priority: Blocker  (was: Major)

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.2.0
>
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [AC0, AT1, AT2] 
> Rebalance is triggered 
> W2 joins with assignment: [AT4, AT5, BC0] 
> W3 joins with assignment: [BT1, BT2, BT4]
> W4 joins with assignment: [B

[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8391:
-
Priority: Blocker  (was: Critical)

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8391:
-
Fix Version/s: 3.2.0
   (was: 2.3.2)
   (was: 2.6.0)
   (was: 2.4.2)
   (was: 2.5.1)

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.2.0
>
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13737) Flaky kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection

2022-03-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13737:
-

 Summary: Flaky 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection
 Key: KAFKA-13737
 URL: https://issues.apache.org/jira/browse/KAFKA-13737
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Examples:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests

{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: describeTopics
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
kafka.utils.TestUtils$.$anonfun$waitForLeaderToBecome$1(TestUtils.scala:1812)
at scala.util.Try$.apply(Try.scala:210)
at kafka.utils.TestUtils$.currentLeader$1(TestUtils.scala:1811)
at kafka.utils.TestUtils$.waitForLeaderToBecome(TestUtils.scala:1819)
at kafka.utils.TestUtils$.assertLeader(TestUtils.scala:1789)
at 
kafka.admin.LeaderElectionCommandTest.testPreferredReplicaElection(LeaderElectionCommandTest.scala:172)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-13736:
---

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13736.
---
Resolution: Duplicate

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


<    1   2   3   4   5   6   7   8   9   10   >