[GitHub] [kafka] Stephan14 commented on a diff in pull request #12308: KAFKA-14009: update rebalance timeout in memory when consumers use st…
Stephan14 commented on code in PR #12308: URL: https://github.com/apache/kafka/pull/12308#discussion_r937373327 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1300,7 +1304,9 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextHeartbeatExpiration(group, member) val knownStaticMember = group.get(newMemberId) -group.updateMember(knownStaticMember, protocols, responseCallback) +val oldRebalanceTimeoutMs = knownStaticMember.rebalanceTimeoutMs +val oldSessionTimeoutMs = knownStaticMember.sessionTimeoutMs +group.updateMember(knownStaticMember, protocols, rebalanceTimeoutMs, sessionTimeoutMs, responseCallback) Review Comment: @dajac Can you review it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575018#comment-17575018 ] Sagar Rao commented on KAFKA-12495: --- Thanks [~ChrisEgerton] . I would take a look. I had another idea as well which is probably more extravagant. I was thinking we let the assignor do whatever it does and we let it stabilise. How about we run some checks afterwards to see if the assignments are balanced or skewed and then trigger a rebalance later on. IIRC, kafka streams does something similar by leveraging probably the Consumer#enforceRebalance. I can dig deep into it if needed. We can take it even a step further by even providing users the ability to define the idea of balanced assignments. Case in point could be running it on cloud v/s running it on prem. This would also alleviate some of the concerns raised in this ticket about rebalance storms(I know you think it's not a risk) or premature successive rebalances etc. WDYT? Does it make sense? > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Priority: Critical > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered >
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12458: MINOR: Adds KRaft versions of most streams system tests
guozhangwang commented on code in PR #12458: URL: https://github.com/apache/kafka/pull/12458#discussion_r937261885 ## tests/kafkatest/tests/streams/streams_broker_bounce_test.py: ## @@ -251,8 +264,9 @@ def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], -num_failures=[2]) -def test_many_brokers_bounce(self, failure_mode, num_failures): +num_failures=[2], +metadata_quorum=quorum.all_non_upgrade) +def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk): Review Comment: Ah just saw @vvcephei 's comment earlier, that makes sense. Please feel free to ignore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12458: MINOR: Adds KRaft versions of most streams system tests
guozhangwang commented on code in PR #12458: URL: https://github.com/apache/kafka/pull/12458#discussion_r937256375 ## tests/kafkatest/tests/streams/streams_broker_bounce_test.py: ## @@ -205,11 +212,17 @@ def collect_results(self, sleep_time_secs): return data @cluster(num_nodes=7) +@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], +broker_type=["leader"], +num_threads=[1, 3], +sleep_time_secs=[120], +metadata_quorum=[quorum.remote_kraft]) Review Comment: Why we only want to test remote_kraft but not collocated kraft? ## tests/kafkatest/tests/streams/streams_broker_bounce_test.py: ## @@ -251,8 +264,9 @@ def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time @cluster(num_nodes=7) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], -num_failures=[2]) -def test_many_brokers_bounce(self, failure_mode, num_failures): +num_failures=[2], +metadata_quorum=quorum.all_non_upgrade) +def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum=quorum.zk): Review Comment: nit: since we already set the value range of `metadata_quorum` in the matrix, do we still need to set its default as `quorum.zk`? Seems the default value would never be used? Ditto elsewhere. ## tests/kafkatest/tests/streams/streams_static_membership_test.py: ## @@ -50,8 +55,10 @@ def __init__(self, test_context): acks=1) @cluster(num_nodes=8) -def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self): -self.zookeeper.start() Review Comment: Should we do this change in `streams_cooperative_rebalance_upgrade_test` also? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12458: MINOR: Adds KRaft versions of most streams system tests
guozhangwang commented on PR #12458: URL: https://github.com/apache/kafka/pull/12458#issuecomment-1204622110 > In addition to addressing the review comments, can you post a link to the system test results with this change? It would be good to verify the impact before merging. +1. I'd also love to learn how much system test time increase this one would incur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14140) Ensure a fenced or in-controlled-shutdown replica is not eligible to become leader in ZK mode
[ https://issues.apache.org/jira/browse/KAFKA-14140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-14140: -- Assignee: Justine Olshan > Ensure a fenced or in-controlled-shutdown replica is not eligible to become > leader in ZK mode > - > > Key: KAFKA-14140 > URL: https://issues.apache.org/jira/browse/KAFKA-14140 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.3.0 > > > KIP-841 introduced fencing on ISR in KRaft. We should also provide some of > these protections in ZK, since all the ground work is mostly there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13299) Accept listeners that have the same port but use IPv4 vs IPv6
[ https://issues.apache.org/jira/browse/KAFKA-13299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13299: --- Fix Version/s: 3.4.0 (was: 3.3.0) > Accept listeners that have the same port but use IPv4 vs IPv6 > - > > Key: KAFKA-13299 > URL: https://issues.apache.org/jira/browse/KAFKA-13299 > Project: Kafka > Issue Type: Improvement >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Major > Fix For: 3.4.0 > > > Currently we are going through a process where we want to migrate Kafka > brokers from IPv4 to IPv6. The simplest way for us to do this would be to > allow Kafka to have 2 listeners of the same port however one listener has an > IPv4 address allocated and another listener has an IPv6 address allocated. > Currently this is not possible in Kafka because it validates that all of the > listeners have a unique port. With some rudimentary testing if this > validation is removed (so we are able to have 2 listeners of the same port > but with different IP versions) there doesn't seem to be any immediate > problems, the kafka clusters works without any problems. > Is there some fundamental reason behind this limitation of having unique > ports? Consequently would there be any problems in loosening this limitation > (i.e. duplicate ports are allowed if the IP versions are different) or just > altogether removing the restriction -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13299) Accept listeners that have the same port but use IPv4 vs IPv6
[ https://issues.apache.org/jira/browse/KAFKA-13299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574967#comment-17574967 ] Jose Armando Garcia Sancio commented on KAFKA-13299: I am going to replace 3.3.0 with 3.4.0 for the fix version. > Accept listeners that have the same port but use IPv4 vs IPv6 > - > > Key: KAFKA-13299 > URL: https://issues.apache.org/jira/browse/KAFKA-13299 > Project: Kafka > Issue Type: Improvement >Reporter: Matthew de Detrich >Assignee: Matthew de Detrich >Priority: Major > Fix For: 3.3.0 > > > Currently we are going through a process where we want to migrate Kafka > brokers from IPv4 to IPv6. The simplest way for us to do this would be to > allow Kafka to have 2 listeners of the same port however one listener has an > IPv4 address allocated and another listener has an IPv6 address allocated. > Currently this is not possible in Kafka because it validates that all of the > listeners have a unique port. With some rudimentary testing if this > validation is removed (so we are able to have 2 listeners of the same port > but with different IP versions) there doesn't seem to be any immediate > problems, the kafka clusters works without any problems. > Is there some fundamental reason behind this limitation of having unique > ports? Consequently would there be any problems in loosening this limitation > (i.e. duplicate ports are allowed if the IP versions are different) or just > altogether removing the restriction -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14140) Ensure a fenced or in-controlled-shutdown replica is not eligible to become leader in ZK mode
Justine Olshan created KAFKA-14140: -- Summary: Ensure a fenced or in-controlled-shutdown replica is not eligible to become leader in ZK mode Key: KAFKA-14140 URL: https://issues.apache.org/jira/browse/KAFKA-14140 Project: Kafka Issue Type: Task Reporter: Justine Olshan Fix For: 3.3.0 KIP-841 introduced fencing on ISR in KRaft. We should also provide some of these protections in ZK, since all the ground work is mostly there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14139: Description: We have been thinking about disk failure cases recently. Suppose that a disk has failed and the user needs to restart the disk from an empty state. The concern is whether this can lead to the unnecessary loss of committed data. For normal topic partitions, removal from the ISR during controlled shutdown buys us some protection. After the replica is restarted, it must prove its state to the leader before it can be added back to the ISR. And it cannot become a leader until it does so. An obvious exception to this is when the replica is the last member in the ISR. In this case, the disk failure itself has compromised the committed data, so some amount of loss must be expected. We have been considering other scenarios in which the loss of one disk can lead to data loss even when there are replicas remaining which have all of the committed entries. One such scenario is this: Suppose we have a partition with two replicas: A and B. Initially A is the leader and it is the only member of the ISR. # Broker B catches up to A, so A attempts to send an AlterPartition request to the controller to add B into the ISR. # Before the AlterPartition request is received, replica B has a hard failure. # The current controller successfully fences broker B. It takes no action on this partition since B is already out of the ISR. # Before the controller receives the AlterPartition request to add B, it also fails. # While the new controller is initializing, suppose that replica B finishes startup, but the disk has been replaced (all of the previous state has been lost). # The new controller sees the registration from broker B first. # Finally, the AlterPartition from A arrives which adds B back into the ISR even though it has an empty log. (Credit for coming up with this scenario goes to [~junrao] .) I tested this in KRaft and confirmed that this sequence is possible (even if perhaps unlikely). There are a few ways we could have potentially detected the issue. First, perhaps the leader should have bumped the leader epoch on all partitions when B was fenced. Then the inflight AlterPartition would be doomed no matter when it arrived. Alternatively, we could have relied on the broker epoch to distinguish the dead broker's state from that of the restarted broker. This could be done by including the broker epoch in both the `Fetch` request and in `AlterPartition`. Finally, perhaps even normal kafka replication should be using a unique identifier for each disk so that we can reliably detect when it has changed. For example, something like what was proposed for the metadata quorum here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.] was: We have been thinking about disk failure cases recently. Suppose that a disk has failed and the user needs to restart the disk from an empty state. The concern is whether this can lead to the unnecessary loss of committed data. For normal topic partitions, removal from the ISR during controlled shutdown buys us some protection. After the replica is restarted, it must prove its state to the leader before it can be added back to the ISR. And it cannot become a leader until it does so. An obvious exception to this is when the replica is the last member in the ISR. In this case, the disk failure itself has compromised the committed data, so some amount of loss must be expected. We have been considering other scenarios in which the loss of one disk can lead to data loss even when there are replicas remaining which have all of the committed entries. One such scenario is this: Suppose we have a partition with two replicas: A and B. Initially A is the leader and it is the only member of the ISR. # Broker B catches up to A, so A attempts to send an AlterPartition request to the controller to add B into the ISR. # Before the AlterPartition request is received, replica B has a hard failure. # The current controller successfully fences broker B. It takes no action on this partition since B is already out of the ISR. # Before the controller receives the AlterPartition request to add B, it also fails. # While the new controller is initializing, suppose that replica B finishes startup, but the disk has been replaced (all of the previous state has been lost). # The new controller sees the registration from broker B first. # Finally, the AlterPartition from A arrives which adds B back into the ISR. (Credit for coming up with this scenario goes to [~junrao] .) I tested this in KRaft and confirmed that this sequence is possible (even if perhaps unlikely). There are a few ways we could have potentially detected the issue. First, perhaps the leader should have bumped the leader epoch on all partiti
[jira] [Created] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
Jason Gustafson created KAFKA-14139: --- Summary: Replaced disk can lead to loss of committed data even with non-empty ISR Key: KAFKA-14139 URL: https://issues.apache.org/jira/browse/KAFKA-14139 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson We have been thinking about disk failure cases recently. Suppose that a disk has failed and the user needs to restart the disk from an empty state. The concern is whether this can lead to the unnecessary loss of committed data. For normal topic partitions, removal from the ISR during controlled shutdown buys us some protection. After the replica is restarted, it must prove its state to the leader before it can be added back to the ISR. And it cannot become a leader until it does so. An obvious exception to this is when the replica is the last member in the ISR. In this case, the disk failure itself has compromised the committed data, so some amount of loss must be expected. We have been considering other scenarios in which the loss of one disk can lead to data loss even when there are replicas remaining which have all of the committed entries. One such scenario is this: Suppose we have a partition with two replicas: A and B. Initially A is the leader and it is the only member of the ISR. # Broker B catches up to A, so A attempts to send an AlterPartition request to the controller to add B into the ISR. # Before the AlterPartition request is received, replica B has a hard failure. # The current controller successfully fences broker B. It takes no action on this partition since B is already out of the ISR. # Before the controller receives the AlterPartition request to add B, it also fails. # While the new controller is initializing, suppose that replica B finishes startup, but the disk has been replaced (all of the previous state has been lost). # The new controller sees the registration from broker B first. # Finally, the AlterPartition from A arrives which adds B back into the ISR. (Credit for coming up with this scenario goes to [~junrao] .) I tested this in KRaft and confirmed that this sequence is possible (even if perhaps unlikely). There are a few ways we could have potentially detected the issue. First, perhaps the leader should have bumped the leader epoch on all partitions when B was fenced. Then the inflight AlterPartition would be doomed no matter when it arrived. Alternatively, we could have relied on the broker epoch to distinguish the dead broker's state from that of the restarted broker. This could be done by including the broker epoch in both the `Fetch` request and in `AlterPartition`. Finally, perhaps even normal kafka replication should be using a unique identifier for each disk so that we can reliably detect when it has changed. For example, something like what was proposed for the metadata quorum here: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #12476: K14130: Reduce RackAwarenesssTest to unit Test
guozhangwang merged PR #12476: URL: https://github.com/apache/kafka/pull/12476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12380: MINOR: Remove ARM/PowerPC builds from Jenkinsfile
cmccabe merged PR #12380: URL: https://github.com/apache/kafka/pull/12380 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12476: K14130: Reduce RackAwarenesssTest to unit Test
guozhangwang commented on PR #12476: URL: https://github.com/apache/kafka/pull/12476#issuecomment-1204551176 I'm merging in this PR now since it includes a hotfix on [StreamTaskTest.java] to turn the build to green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12380: MINOR: Remove ARM/PowerPC builds from Jenkinsfile
cmccabe commented on PR #12380: URL: https://github.com/apache/kafka/pull/12380#issuecomment-1204549585 Yeah, these builds are out of control, failing all the time. It's probably more appropriate to make these nightly builds anyway -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent
Guozhang Wang created KAFKA-14138: - Summary: The Exception Throwing Behavior of Transactional Producer is Inconsistent Key: KAFKA-14138 URL: https://issues.apache.org/jira/browse/KAFKA-14138 Project: Kafka Issue Type: Improvement Components: producer Reporter: Guozhang Wang There's an issue for inconsistent error throwing inside Kafka Producer when transactions are enabled. In short, there are two places where the received error code from the brokers would be eventually thrown to the caller: * Recorded on the batch's metadata, via "Sender#failBatch" * Recorded on the txn manager, via "txnManager#handleFailedBatch". The former would be thrown from 1) the `Future` returned from the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, the latter would be thrown from `producer.send()` directly in which we call `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown from the former, it's not wrapped hence the direct exception (e.g. ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. KafkaException(ClusterAuthorizationException). And which one would be thrown depend on a race condition since we cannot control by the time the caller thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's error has been sent back or not. For example consider the following sequence: 1. caller thread: within future = producer.send(), call recordAccumulator.append 2. sender thread: drain the accumulator, send the produceRequest and get the error back. 3. caller thread: within future = producer.send(), call txnManager.maybeAddPartition 4. sender thread: get the addPartition token, send the txnRequest and get the error back. NOTE the sender thread could send these two requests in any order. 5. caller thread: future.get() In a sequence where then 3) happened before 2), we would only get the raw exception at step 5; in a sequence where 2) happened before 3), then we would throw the exception immediately at 3). This inconsistent error throwing is pretty annoying for users since they'd need to handle both cases, but many of them actually do not know this trickiness. We should make the error throwing consistent, e.g. we should consider: 1) which errors would be thrown from callback / future.get, and which would be thrown from the `send` call directly, and these errors should better be non-overlapping, 2) whether we should wrap the raw error or not, we should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #12483: KAFKA-14136 Generate ConfigRecord even if the value is unchanged
cmccabe commented on PR #12483: URL: https://github.com/apache/kafka/pull/12483#issuecomment-1204544001 Can we do this just for the broker configs? We don't have this behavior for topic configs or other types of configs... LGTM other than that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574945#comment-17574945 ] Jose Armando Garcia Sancio commented on KAFKA-1: {quote} I can file a separate non-blocker ticket for those system tests and mark this one done. {quote} Let's do that please. > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Labels: needs-kip > Fix For: 3.3.0 > > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] niket-goel commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records
niket-goel commented on code in PR #12457: URL: https://github.com/apache/kafka/pull/12457#discussion_r937164272 ## raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java: ## @@ -85,18 +88,41 @@ public void testFileRecords( FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); fileRecords.append(memRecords); -testIterator(batches, fileRecords); +testIterator(batches, fileRecords, true); +} + +@Property +public void testCrcValidation( +@ForAll CompressionType compressionType, +@ForAll long seed +) throws IOException { +List> batches = createBatches(seed); +MemoryRecords memRecords = buildRecords(compressionType, batches); +// Corrupt the record buffer + memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt()); Review Comment: There is a non-zero probability that the test might fail due to the random int colliding with the actual value. Will modify the test to fix this. As for the field, it was one of the two exposed offsets that I could choose to corrupt. I am actually not happy about this either. I am going to expose the CRC_OFFSET as a public field and corrupt that instead. Should make for a more reliable test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #12483: KAFKA-14136 Generate ConfigRecord even if the value is unchanged
mumrah opened a new pull request, #12483: URL: https://github.com/apache/kafka/pull/12483 This patch changes the AlterConfigs behavior in KRaft mode to match that of ZK. When we receive a LegacyAlterConfig or IncrementalAlterConfig that does _not_ change a value for a key, we will still generate a ConfigRecord. This is to allow certain refresh behavior on the broker side (e.g., reloading trust stores and key stores). The DynamicBrokerReconfigurationTests which reload key stores and trust stores are enabled in this PR to validate the new behavior. Also, a small fix for KAFKA-14115 is included. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574928#comment-17574928 ] Guozhang Wang commented on KAFKA-13840: --- I think this issue is indeed fixed in the latest release (starting in 3.1) where upon `commitAsync` we would try to clear-and-discover the coordinator: https://github.com/apache/kafka/pull/12259/files#diff-0029e982555d1fae10943b862924da962ca8e247a3070cded92c5f5a5960244fR954 Could you kindly check that code change and see if it would avoid the scenario you observed in the previous version? > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Critical > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #12482: Merge up to Apache Kafka 3.3 branching point
ijuma commented on PR #12482: URL: https://github.com/apache/kafka/pull/12482#issuecomment-1204477274 Please disregard, wrong PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12482: Merge up to Apache Kafka 3.3 branching point
ijuma opened a new pull request, #12482: URL: https://github.com/apache/kafka/pull/12482 > $ git merge-base apache-github/3.3 apache-github/trunk > 23c92ce79366e86ca719e5e51c550c27324acd83 > $ git show 23c92ce79366e86ca719e5e51c550c27324acd83 > commit 23c92ce79366e86ca719e5e51c550c27324acd83 > Author: SC > Date: Mon Jul 11 11:36:56 2022 +0900 > >MINOR: Use String#format for niceMemoryUnits result (#12389) > >Reviewers: Luke Chen , Divij Vaidya ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #12482: Merge up to Apache Kafka 3.3 branching point
ijuma closed pull request #12482: Merge up to Apache Kafka 3.3 branching point URL: https://github.com/apache/kafka/pull/12482 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13530) Flaky test ReplicaManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-13530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574920#comment-17574920 ] Matthew de Detrich commented on KAFKA-13530: So I ran the tests inside a docker container to simulate limited resources and couldn't replicate the flakiness > Flaky test ReplicaManagerTest > - > > Key: KAFKA-13530 > URL: https://issues.apache.org/jira/browse/KAFKA-13530 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > > kafka.server.ReplicaManagerTest.[1] usesTopicIds=true > {quote}org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502) > at > kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572){quote} > STDOUT > {quote}[2021-12-07 16:19:35,906] ERROR Error while reading checkpoint file > /tmp/kafka-6310287969113820536/replication-offset-checkpoint > (kafka.server.LogDirFailureChannel:76) java.nio.file.NoSuchFileException: > /tmp/kafka-6310287969113820536/replication-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > org.apache.kafka.server.common.CheckpointFile.read(CheckpointFile.java:104) > at > kafka.server.checkpoints.CheckpointFileWithFailureHandler.read(CheckpointFileWithFailureHandler.scala:48) > at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:70) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets$lzycompute(OffsetCheckpointFile.scala:94) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.offsets(OffsetCheckpointFile.scala:94) > at > kafka.server.checkpoints.LazyOffsetCheckpointMap.fetch(OffsetCheckpointFile.scala:97) > at > kafka.server.checkpoints.LazyOffsetCheckpoints.fetch(OffsetCheckpointFile.scala:89) > at kafka.cluster.Partition.updateHighWatermark$1(Partition.scala:348) at > kafka.cluster.Partition.createLog(Partition.scala:361) at > kafka.cluster.Partition.maybeCreate$1(Partition.scala:334) at > kafka.cluster.Partition.createLogIfNotExists(Partition.scala:341) at > kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:546) at > kafka.cluster.Partition.makeLeader(Partition.scala:530) at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$3(ReplicaManager.scala:2163) > at scala.Option.foreach(Option.scala:437) at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2(ReplicaManager.scala:2160) > at > kafka.server.ReplicaManager.$anonfun$applyLocalLeadersDelta$2$adapted(ReplicaManager.scala:2159) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) > at > kafka.server.ReplicaManager.applyLocalLeadersDelta(ReplicaManager.scala:2159) > at kafka.server.ReplicaManager.applyDelta(ReplicaManager.scala:2136) at > kafka.server.ReplicaManagerTest.testDeltaToLeaderOrFollowerMarksPartitionOfflineIfLogCantBeCreated(ReplicaManagerTest.scala:3349){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2
mdedetrich commented on code in PR #12281: URL: https://github.com/apache/kafka/pull/12281#discussion_r937115822 ## streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java: ## @@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) { public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) { final String metricsRecorderName = metricsRecorderName(metricsRecorder); -if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) { +final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder); Review Comment: I agree with @C0urante here, if the method isn't meant to be invoked in a concurrent situation then we should either revert to the original `containsKey` OR add a comment along the lines of "Currently this code isn't called in a concurrent situation" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13840: Priority: Critical (was: Major) > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Critical > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14137) Security Vulnerabilities reported in CVE-2021-45046 and CVE-2021-45046
venkat created KAFKA-14137: -- Summary: Security Vulnerabilities reported in CVE-2021-45046 and CVE-2021-45046 Key: KAFKA-14137 URL: https://issues.apache.org/jira/browse/KAFKA-14137 Project: Kafka Issue Type: Improvement Components: KafkaConnect Environment: Production Reporter: venkat Security vulnerabilties issues reported in CVE-2021-45046 and CVE-2021-45046 VUL0094706 -QID-376209: Apache Log4j Remote Code Execution (RCE) Vulnerability (CVE-2021-44832) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574909#comment-17574909 ] Kyle R Stehbens commented on KAFKA-13840: - [~guozhang] In this instance, flink is continually calling commitAsync() on the consumer (in our case every 30 seconds) with new offsets but this doesn't recover the coordinator and all subsequent commits fail. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Major > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dplavcic commented on pull request #12449: KAFKA-12947 [WIP]: Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest
dplavcic commented on PR #12449: URL: https://github.com/apache/kafka/pull/12449#issuecomment-1204406842 @ableegoldman, @cadonna could you please help review this changes in the streams test domain? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dplavcic commented on a diff in pull request #12449: KAFKA-12947 [WIP]: Replace EasyMock and PowerMock with Mockito for StreamsMetricsImplTest
dplavcic commented on code in PR #12449: URL: https://github.com/apache/kafka/pull/12449#discussion_r937067118 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java: ## @@ -287,7 +279,7 @@ public void shouldGetExistingThreadLevelSensor() { final Sensor actualSensor = streamsMetrics.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel); -verify(metrics); +verify(metrics).getSensor("internal.test-thread-1.s.sensor1"); Review Comment: `shouldGetExistingThreadLevelSensor` test invokes `setupGetExistingSensorTest(metrics)` which does the following: ```java private void setupGetExistingSensorTest(final Metrics metrics) { when(metrics.getSensor(anyString())).thenReturn(sensor); } ``` An explicit verify is added to be sure `metrics.getSensor()` is invoked with the exact string, and not `anyString()` as stated in stub in `setupGetExistingSensorTest` method. The same reasoning applies to other explicit `verify(metrics).getSensor(...)` calls. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java: ## @@ -305,7 +297,6 @@ public void shouldGetNewTaskLevelSensor() { recordingLevel ); -verify(metrics); Review Comment: `shouldGetNewTaskLevelSensor` invokes `setupGetNewSensorTest` which internally creates a stub with exact values, providing implicit verify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1204360460 Hi @jsancio, I have defined an enum in the raft module for `SnapshotReason` and have used it in place of string messages that were being passed previously. I am working on making the changes for `RaftClient` and `KafkaMetadataLog`. These changes will log the snapshot reason in `createNewSnapshot` function of `KafkaMetadataLog`. I had two implementation queries, could you please help with them - 1. How do we handle multiple reasons for starting a snapshot in an enum? 2. I would also need to change the function signature of `createNewSnapshot()` in `KafkaMetadataLog` to accommodate the new `SnapshotReason` parameter. Below is the new signature. Would this be okay? ``` Optional createNewSnapshot(OffsetAndEpoch snapshotId, SnapshotReason reason); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
jsancio commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r937029349 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -129,16 +129,17 @@ class BrokerMetadataListener( } } - private def shouldSnapshot(): Option[String] = { + private def shouldSnapshot(): Option[SnapshotReason] = { val metadataVersionHasChanged: Boolean = metadataVersionChanged() val maxBytesHaveExceeded: Boolean = (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) +//TODO: Figure out how to handle multiple reasons for snapshot Review Comment: One solution is to return a `Set[SnapshotReason]` and pass the set when creating the snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
mdedetrich commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r937028065 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: > Thanks, in that case, would you kindly review that PR please if you get some time? It's been pending for a while now waiting to get some attention. I just landed a review of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dplavcic commented on a diff in pull request #12476: K14130: Reduce RackAwarenesssTest to unit Test
dplavcic commented on code in PR #12476: URL: https://github.com/apache/kafka/pull/12476#discussion_r937020156 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java: ## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockInternalTopicManager; +import org.apache.kafka.test.MockKeyValueStoreBuilder; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_7; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_8; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_9; +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.easymock.EasyMock.anyObject; Review Comment: [optional] @guozhangwang, if you add `@RunWith(MockitoJUnitRunner.StrictStubs.class)` at class level and re-run all tests you will see following exception (or warning, it depends on how you look at it): ``` Unnecessary stubbings detected in test class: RackAwarenessStreamsPartitionAssignorTest Clean & maintainable test code requires zero unnecessary code. Following stubbings are unnecessary (click to navigate to relevant line of code): 1. -> at org.apache.kafka.streams.processor.internals.RackAwarenessStreamsPartitionAssignorTest.createMockTaskManager(RackAwarenessStreamsPartitionAssignorTest.java:173) Please remove unnecessary stubbings or use 'lenient' strictness. More info: javadoc for UnnecessaryStubbingException class. ``` To read mo
[GitHub] [kafka] jsancio commented on a diff in pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records
jsancio commented on code in PR #12457: URL: https://github.com/apache/kafka/pull/12457#discussion_r936987763 ## raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java: ## @@ -85,18 +88,41 @@ public void testFileRecords( FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); fileRecords.append(memRecords); -testIterator(batches, fileRecords); +testIterator(batches, fileRecords, true); +} + +@Property +public void testCrcValidation( +@ForAll CompressionType compressionType, +@ForAll long seed +) throws IOException { +List> batches = createBatches(seed); +MemoryRecords memRecords = buildRecords(compressionType, batches); +// Corrupt the record buffer + memRecords.buffer().putInt(DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET, new Random(seed).nextInt()); Review Comment: Does a random int always corrupt the batch? Meaning, it is possible that this test sometime fails because it was unlucky to pick a random int that didn't invalidate the CRC. Can you explain in a comment why this buffer/memory change specifically? ## raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java: ## @@ -179,8 +182,13 @@ private Optional> nextBatch() { return Optional.empty(); } -private Batch readBatch(DefaultRecordBatch batch) { +private Batch readBatch(DefaultRecordBatch batch) throws CorruptRecordException { final Batch result; +if (doCrcValidation) { +// Perform a CRC validity check on this block. Review Comment: typo; did you mean "on this batch."? ## raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java: ## @@ -179,8 +182,13 @@ private Optional> nextBatch() { return Optional.empty(); } -private Batch readBatch(DefaultRecordBatch batch) { +private Batch readBatch(DefaultRecordBatch batch) throws CorruptRecordException { final Batch result; +if (doCrcValidation) { Review Comment: You can move this block before `final Batch tresult` and keep `result` close to where it is initialized. ## raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java: ## @@ -49,17 +50,20 @@ // Number of bytes from records read up to now private int bytesRead = 0; private boolean isClosed = false; +private boolean doCrcValidation = false; Review Comment: This should be `private final boolean doCrcValidation;`. It should be moved with the rest of the `final` fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dplavcic commented on pull request #12459: KAFKA-13036: Replace EasyMock and PowerMock with Mockito for RocksDBMetricsRecorderTest
dplavcic commented on PR #12459: URL: https://github.com/apache/kafka/pull/12459#issuecomment-1204331217 Looks good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14136) AlterConfigs in KRaft does not generate records for unchanged values
David Arthur created KAFKA-14136: Summary: AlterConfigs in KRaft does not generate records for unchanged values Key: KAFKA-14136 URL: https://issues.apache.org/jira/browse/KAFKA-14136 Project: Kafka Issue Type: Bug Reporter: David Arthur Assignee: David Arthur Fix For: 3.3.0, 3.4.0 In ZK, when handling LegacyAlterConfigs or IncrementalAlterConfigs, we call certain code paths regardless of what values are included in the request. We utilize this behavior to force a broker to reload a keystore or truststore from disk (we sent an AlterConfig with the keystore path unchanged). In KRaft, however, we have an optimization to only generate ConfigRecords if the incoming AtlerConfig request will result in actual config changes. This means the broker never receives any records for "no-op" config changes and we cannot trigger certain code paths. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #7670: KAFKA-7016: Not hide the stack trace for ApiException
guozhangwang commented on PR #7670: URL: https://github.com/apache/kafka/pull/7670#issuecomment-1204324571 Hello @gitlw @garyrussell @kkolyan sorry for getting late on you. There's a concern which is not recorded on the PR here, which is that printing the stack trace on each ApiException may cause a log flooding on the broker side. @ijuma originally raised it so he may provide some more context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574893#comment-17574893 ] Guozhang Wang commented on KAFKA-13840: --- [~kyle.stehbens] Just to clarify, when the retriable error happens for the commitAsync, did the caller thread further triggers other functions on the consumer? In the current code, as long as the caller triggers "pull", or another "commitAsync", the "ensureCoordinatorReady" function would be triggered which would clear the failed future and mark coordinator unknown, so I'm still a bit less clear how the client would unable to recover its group co-coordinator and leaves the client in a broken state. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Major > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14111) Dynamic config update fails for "password" configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-14111: - Fix Version/s: 3.3.0 3.4.0 > Dynamic config update fails for "password" configs in KRaft > --- > > Key: KAFKA-14111 > URL: https://issues.apache.org/jira/browse/KAFKA-14111 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.3.0, 3.4.0 > > > Two related bugs found when working on updating > DynamicBrokerReconfigurationTest for KRaft. > Firstly, if we issue an AlterConfig (or IncrementalAlterConfig) for a broker > config that is defined as a "password", it will succeed on the controller, > but throw an error when the broker handles it. > For example, on a vanilla cluster running "config/kraft/server.properties" > {code} > /bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --broker 1 > --add-config listener.name.external.ssl.key.password=foo > {code} > results in > {code} > [2022-07-26 16:24:05,049] ERROR Dynamic password config > listener.name.external.ssl.key.password could not be decoded, ignoring. > (kafka.server.DynamicBrokerConfig) > org.apache.kafka.common.config.ConfigException: Password encoder secret not > configured > at > kafka.server.DynamicBrokerConfig.$anonfun$passwordEncoder$1(DynamicBrokerConfig.scala:352) > at scala.Option.getOrElse(Option.scala:201) > at > kafka.server.DynamicBrokerConfig.passwordEncoder(DynamicBrokerConfig.scala:352) > at > kafka.server.DynamicBrokerConfig.decodePassword$1(DynamicBrokerConfig.scala:393) > at > kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5(DynamicBrokerConfig.scala:404) > at > kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5$adapted(DynamicBrokerConfig.scala:402) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at scala.collection.MapOps.foreachEntry(Map.scala:244) > at scala.collection.MapOps.foreachEntry$(Map.scala:240) > at scala.collection.AbstractMap.foreachEntry(Map.scala:405) > at > kafka.server.DynamicBrokerConfig.fromPersistentProps(DynamicBrokerConfig.scala:402) > at > kafka.server.DynamicBrokerConfig.$anonfun$updateBrokerConfig$1(DynamicBrokerConfig.scala:300) > at > kafka.server.DynamicBrokerConfig.updateBrokerConfig(DynamicBrokerConfig.scala:299) > at > kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:221) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$15(BrokerMetadataPublisher.scala:212) > at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14(BrokerMetadataPublisher.scala:190) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14$adapted(BrokerMetadataPublisher.scala:189) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:189) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:293) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:126) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:126) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:126) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:833) > {code}. > If a {{password.encoder.secret}} is supplied, this still fails but with: > {code} > [2022-07-26 16:27:23,247] ERROR Dynamic password config > listener.name.external.ssl.key.password could not be decoded, ignoring. > (kafka.server.DynamicBrokerConfig) > java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 3 > at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4604) > at java.base/java.lang.String.substring(String.java:2707) > at kafka.utils.CoreUtils$.$anonfun$parseCsvMap$1(CoreUtils.scala:173) > at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929) > at kafka.utils.CoreUtils$.parseCsvMap(CoreUtils.scala:17
[jira] [Resolved] (KAFKA-14111) Dynamic config update fails for "password" configs in KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-14111. -- Resolution: Fixed > Dynamic config update fails for "password" configs in KRaft > --- > > Key: KAFKA-14111 > URL: https://issues.apache.org/jira/browse/KAFKA-14111 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.3.0, 3.4.0 > > > Two related bugs found when working on updating > DynamicBrokerReconfigurationTest for KRaft. > Firstly, if we issue an AlterConfig (or IncrementalAlterConfig) for a broker > config that is defined as a "password", it will succeed on the controller, > but throw an error when the broker handles it. > For example, on a vanilla cluster running "config/kraft/server.properties" > {code} > /bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --broker 1 > --add-config listener.name.external.ssl.key.password=foo > {code} > results in > {code} > [2022-07-26 16:24:05,049] ERROR Dynamic password config > listener.name.external.ssl.key.password could not be decoded, ignoring. > (kafka.server.DynamicBrokerConfig) > org.apache.kafka.common.config.ConfigException: Password encoder secret not > configured > at > kafka.server.DynamicBrokerConfig.$anonfun$passwordEncoder$1(DynamicBrokerConfig.scala:352) > at scala.Option.getOrElse(Option.scala:201) > at > kafka.server.DynamicBrokerConfig.passwordEncoder(DynamicBrokerConfig.scala:352) > at > kafka.server.DynamicBrokerConfig.decodePassword$1(DynamicBrokerConfig.scala:393) > at > kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5(DynamicBrokerConfig.scala:404) > at > kafka.server.DynamicBrokerConfig.$anonfun$fromPersistentProps$5$adapted(DynamicBrokerConfig.scala:402) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at scala.collection.MapOps.foreachEntry(Map.scala:244) > at scala.collection.MapOps.foreachEntry$(Map.scala:240) > at scala.collection.AbstractMap.foreachEntry(Map.scala:405) > at > kafka.server.DynamicBrokerConfig.fromPersistentProps(DynamicBrokerConfig.scala:402) > at > kafka.server.DynamicBrokerConfig.$anonfun$updateBrokerConfig$1(DynamicBrokerConfig.scala:300) > at > kafka.server.DynamicBrokerConfig.updateBrokerConfig(DynamicBrokerConfig.scala:299) > at > kafka.server.BrokerConfigHandler.processConfigChanges(ConfigHandler.scala:221) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$15(BrokerMetadataPublisher.scala:212) > at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14(BrokerMetadataPublisher.scala:190) > at > kafka.server.metadata.BrokerMetadataPublisher.$anonfun$publish$14$adapted(BrokerMetadataPublisher.scala:189) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.metadata.BrokerMetadataPublisher.publish(BrokerMetadataPublisher.scala:189) > at > kafka.server.metadata.BrokerMetadataListener.kafka$server$metadata$BrokerMetadataListener$$publish(BrokerMetadataListener.scala:293) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2(BrokerMetadataListener.scala:126) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.$anonfun$run$2$adapted(BrokerMetadataListener.scala:126) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.metadata.BrokerMetadataListener$HandleCommitsEvent.run(BrokerMetadataListener.scala:126) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:121) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:200) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:173) > at java.base/java.lang.Thread.run(Thread.java:833) > {code}. > If a {{password.encoder.secret}} is supplied, this still fails but with: > {code} > [2022-07-26 16:27:23,247] ERROR Dynamic password config > listener.name.external.ssl.key.password could not be decoded, ignoring. > (kafka.server.DynamicBrokerConfig) > java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 3 > at java.base/java.lang.String.checkBoundsBeginEnd(String.java:4604) > at java.base/java.lang.String.substring(String.java:2707) > at kafka.utils.CoreUtils$.$anonfun$parseCsvMap$1(CoreUtils.scala:173) > at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929) > at kafka.utils.CoreUtils$.parseCsvMap(CoreUtils.scala:171) > at kafka.utils.
[GitHub] [kafka] mumrah commented on pull request #12455: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft
mumrah commented on PR #12455: URL: https://github.com/apache/kafka/pull/12455#issuecomment-1204298550 Manually cherry-picked to 3.3 as a687d4d3f687 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] premkamal23 opened a new pull request, #12481: Kafka 14115
premkamal23 opened a new pull request, #12481: URL: https://github.com/apache/kafka/pull/12481 [KAFKA-14115] Password configs are logged in plaintext in KRaft While updating the config for a broker ConfigurationControlManager is logging sensitive config values (listener.name.external.ssl.key.password). ConfigResource(type=BROKER, name='1'): set configuration listener.name.external.ssl.key.password to bar We need to redact these values the same as BrokerMetadataPublisher Updating broker 1 with new configuration : listener.name.external.ssl.key.password -> [hidden] Changes: updated isSensitive method to check if the config name contains the string password and used the same while logging config values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #12455: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft
mumrah merged PR #12455: URL: https://github.com/apache/kafka/pull/12455 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12455: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft
mumrah commented on PR #12455: URL: https://github.com/apache/kafka/pull/12455#issuecomment-1204262912 After latest commit, only test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12476: K14130: Reduce RackAwarenesssTest to unit Test
guozhangwang commented on code in PR #12476: URL: https://github.com/apache/kafka/pull/12476#discussion_r936948546 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java: ## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockInternalTopicManager; +import org.apache.kafka.test.MockKeyValueStoreBuilder; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_7; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_8; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_9; +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.easymock.EasyMock.anyObject; Review Comment: Thanks @divijvaidya , I've updated the class to switch to mockito. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #12480: MINOR: Small cleanups in integration.kafka tests
mimaison opened a new pull request, #12480: URL: https://github.com/apache/kafka/pull/12480 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12418: KAFKA-13414: Replace PowerMock/EasyMock with Mockito in connect.storage.KafkaOffsetBackingStoreTest
clolov commented on PR #12418: URL: https://github.com/apache/kafka/pull/12418#issuecomment-1204186138 Thank you for the review @C0urante! I will have a look and aim to provide answers (+ new commits) in the next couple of days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on pull request #12457: KAFKA-14104: Add CRC validation when iterating over Metadata Log Records
niket-goel commented on PR #12457: URL: https://github.com/apache/kafka/pull/12457#issuecomment-1204185885 The build failures are a little cryptic. The same tests all pass locally, and the failing step in the build only complains of the script exiting with a non-zero status, but there are no actual failing tests. Will just try to trigger another run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13877) Flaky RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags
[ https://issues.apache.org/jira/browse/KAFKA-13877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13877. --- Resolution: Fixed > Flaky > RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags > > > Key: KAFKA-13877 > URL: https://issues.apache.org/jira/browse/KAFKA-13877 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: newbie > > The following test fails on local testbeds about once per 10-15 runs: > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:87) > at org.junit.Assert.assertTrue(Assert.java:42) > at org.junit.Assert.assertTrue(Assert.java:53) > at > org.apache.kafka.streams.integration.RackAwarenessIntegrationTest.shouldDistributeStandbyReplicasOverMultipleClientTags(RackAwarenessIntegrationTest.java:192) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:53) > at > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #12468: KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest
guozhangwang merged PR #12468: URL: https://github.com/apache/kafka/pull/12468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
divijvaidya commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r936828320 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: > Do you think it makes sense to format the docs or should I leave the PR as is? I think we should leave the PR as is. Converting the docs in the entire project will be cumbersome without any benefit. > In that case it's sensible to wait for your PR to land and then I can rebase. Thanks, in that case, would you kindly review that PR please if you get some time? It's been pending for a while now waiting to get some attention. ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: > Do you think it makes sense to format the docs or should I leave the PR as is? I think we should leave the PR as is. Converting the docs in the entire project will be cumbersome without any benefit. > In that case it's sensible to wait for your PR to land and then I can rebase. Thanks, in that case, would you kindly review that PR please if you get some time? It's been pending for a while now waiting to get some attention. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574771#comment-17574771 ] Chris Egerton commented on KAFKA-12495: --- [~sagarrao] yes, it's unassigned so anyone can feel free to take a shot at it. I should note that I don't believe the proposal to use the scheduled rebalance delay in between successive revocation rounds is safe, though. With that change, it would become impossible in some scenarios to scale up a cluster within the rebalance delay, and given that the current default for that delay is five minutes, this may cause some issues for Connect cluster administrators. In fact, I would even argue that it would qualify as a regression since the outcome for users in this scenario would be significantly worse than it is at the moment. I'm still not convinced that rebalance storms are a serious risk when removing the guard against successive revocation rounds, especially if we add some cycle-detection logic. However, if my colleagues still believe it's necessary to take extra precautions against storms, one possibility is that we can compromise and use exponential backoff between successive revocation rounds. This would be more complex to implement and require some potentially-tricky state tracking in the assignor class, but if done effectively, would allow us to start off by having little to no delay between successive revocation rounds, but, in the event that something goes wrong, eventually work our way up to waiting the full scheduled rebalance delay in between rounds. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Priority: Critical > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574767#comment-17574767 ] Sagar Rao commented on KAFKA-12495: --- [~showuon] , is this still open? I can take a stab at it. Let me know. Thanks! > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Luke Chen >Priority: Critical > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > but we didn't revoke any more C/T in this round, which cause unbalanced > distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > Because we didn't allow to do consecutive revoke in two consecutive > rebalances (under the same leader), we will have this uneven distribution > under this situation. We should allow consecutive rebalance to have another > round of revocation to revoke the C/T to the other members in this case. > expected: > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > **and also revoke some C/T** > W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3]) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W4(delay: 0, assigned: [BT4, BT5], revoked: []) > // another round of rebalance to assign the new revoked C/T to the other > members > W1 rejoins with assignment: [AC0, AT1, AT2] > Rebalance is triggered > W2 joins with assignment: [AT4, AT5, BC0] > W3 joins with assignment: [BT1, BT2, BT4]
[GitHub] [kafka] ahuang98 opened a new pull request, #12479: Convert DeleteTopicTest, DeleteTopicsRequestTest, and DeleteTopicsRequestWithDeletionDisabledTest to run in KRaft mode
ahuang98 opened a new pull request, #12479: URL: https://github.com/apache/kafka/pull/12479 DeleteTopicRequestTest and DeleteTopicsRequestWithDeletionDisabledTest are still a work in progress ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1204027403 Got it, working on these changes. Taking a bit of time to go through the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13952) Infinite retry timeout is not working
[ https://issues.apache.org/jira/browse/KAFKA-13952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574748#comment-17574748 ] Yash Mayya commented on KAFKA-13952: [~jmalek] this definitely looks like a bug in the implementation of RetryWithToleranceOperator. Thanks for discovering this issue and filing a bug JIRA. I've raised a PR to fix this issue - https://github.com/apache/kafka/pull/12478 > Infinite retry timeout is not working > - > > Key: KAFKA-13952 > URL: https://issues.apache.org/jira/browse/KAFKA-13952 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Jakub Malek >Priority: Minor > > The > [documentation|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129] > for {{errors.retry.timeout}} property says: > {noformat} > The maximum duration in milliseconds that a failed operation will be > reattempted. The default is 0, which means no retries will be attempted. Use > -1 for infinite retries.{noformat} > But it seems that value {{-1}} is not respected by the > [RetryWithToleranceOperator|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java] > that simply compares elapsed time until {{startTime + errorRetryTimeout}} is > exceeded. > I was also not able to find any conversion of the raw config value before > {{RetryWithToleranceOperator}} is initialized. > I run a simple test with a connector using mocked transformation plugin that > throws the {{RetriableException}} and it seems to prove my claim. > I'm not sure if it's documentation or implementation error or maybe I've > missed something. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya opened a new pull request, #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
yashmayya opened a new pull request, #12478: URL: https://github.com/apache/kafka/pull/12478 - https://issues.apache.org/jira/browse/KAFKA-13952: RetryWithToleranceOperator doesn't respect infinite retries config - i.e. when `errors.retry.timeout` is set to `-1` - From https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect: >errors.retry.timeout: [-1, 0, 1, ... Long.MAX_VALUE], where -1 means infinite duration. - Also, from the config doc: https://github.com/apache/kafka/blob/0c4da23098f8b8ae9542acd7fbaa1e5c16384a39/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129-L130 - This PR fixes the issue in `RetryWithToleranceOperator` along with a couple of unit tests to ensure the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574735#comment-17574735 ] Luke Chen commented on KAFKA-13840: --- I'll take a look. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Major > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] see-quick opened a new pull request, #12477: [MINOR] - update scala version in bin scripts (2.13.8)
see-quick opened a new pull request, #12477: URL: https://github.com/apache/kafka/pull/12477 Signed-off-by: morsak This PR updates scala versions inside `bin/scripts` (i.e., `kafka-run-class.sh`). https://github.com/apache/kafka/pull/12273 fix modified only inside `gradle.properties`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14135) KafkaConfig value sasl.server.callback.handler.class is set to null even when custom class is used
[ https://issues.apache.org/jira/browse/KAFKA-14135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gábor Nagy updated KAFKA-14135: --- Description: As mentioned in the summary, the value `sasl.server.callback.handler.class` is set to `null` in the terminal printout of the KafkaConfig object, even though a custom handler class has been configured, and has been confirmed to be used by Kafka (by testing the callback's behavior). This issue can (has) lead to hours of unnecessary debugging, as well as potential security issues, since not knowing if your brokers are using, for example, the insecure SASL/OAUTHBEARER default handler bundled with Kafka, or some custom implementation, can lead to security breaches. was: As mentioned in the summary, the value `sasl.server.callback.handler.class` is set to `null` in the terminal printout of the KafkaConfig object, even though a custom handler class has been configured, and has been confirmed to be used by Kafka (by testing the callback's behavior). This issue can (has) lead to hours of unnecessary debugging, as well as potential security issues, since not knowing if you brokers are using, for example, the insecure SASL/OAUTHBEARER default handler bundled with Kafka, or some custom implementation, can lead to security breaches. > KafkaConfig value sasl.server.callback.handler.class is set to null even when > custom class is used > -- > > Key: KAFKA-14135 > URL: https://issues.apache.org/jira/browse/KAFKA-14135 > Project: Kafka > Issue Type: Bug > Components: config, security >Affects Versions: 3.2.0 >Reporter: Gábor Nagy >Priority: Major > Attachments: KafkaConfigPrintoutPartial.png, > ServerPropertiesPartial.png > > > As mentioned in the summary, the value `sasl.server.callback.handler.class` > is set to `null` in the terminal printout of the KafkaConfig object, even > though a custom handler class has been configured, and has been confirmed to > be used by Kafka (by testing the callback's behavior). > > This issue can (has) lead to hours of unnecessary debugging, as well as > potential security issues, since not knowing if your brokers are using, for > example, the insecure SASL/OAUTHBEARER default handler bundled with Kafka, or > some custom implementation, can lead to security breaches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14135) KafkaConfig value sasl.server.callback.handler.class is set to null even when custom class is used
Gábor Nagy created KAFKA-14135: -- Summary: KafkaConfig value sasl.server.callback.handler.class is set to null even when custom class is used Key: KAFKA-14135 URL: https://issues.apache.org/jira/browse/KAFKA-14135 Project: Kafka Issue Type: Bug Components: config, security Affects Versions: 3.2.0 Reporter: Gábor Nagy Attachments: KafkaConfigPrintoutPartial.png, ServerPropertiesPartial.png As mentioned in the summary, the value `sasl.server.callback.handler.class` is set to `null` in the terminal printout of the KafkaConfig object, even though a custom handler class has been configured, and has been confirmed to be used by Kafka (by testing the callback's behavior). This issue can (has) lead to hours of unnecessary debugging, as well as potential security issues, since not knowing if you brokers are using, for example, the insecure SASL/OAUTHBEARER default handler bundled with Kafka, or some custom implementation, can lead to security breaches. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12476: K14130: Reduce RackAwarenesssTest to unit Test
divijvaidya commented on code in PR #12476: URL: https://github.com/apache/kafka/pull/12476#discussion_r936500749 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RackAwarenessStreamsPartitionAssignorTest.java: ## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.test.MockApiProcessorSupplier; +import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockInternalTopicManager; +import org.apache.kafka.test.MockKeyValueStoreBuilder; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_7; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_8; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_9; +import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; +import static org.easymock.EasyMock.anyObject; Review Comment: We are actively trying to [get rid of EasyMock/PowerMock](https://issues.apache.org/jira/browse/KAFKA-14132) from all classes. Please use Mockito in this new code. Here are a couple of helpful tips that you might useful: 1. `EasyMock.createNiceMock`/`EasyMock.createMock` can be replaced with `mock` from Mockito. All mocks in Mockito are nice. 2. Mockito doesn't require `replay` so you can get rid of those lines completely. 3. You might want to consider running the test using @RunWith(MockitoJUnitRunner.StrictStubs.class) since it comes with a lot of benefits highlighted in the docs https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/junit/MockitoJUnitRunner.html -- This is an automated message from the Apache Git Service. To resp
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
mdedetrich commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r936511846 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: The Scala 2.13 dialect is backwards compatible with 2.12. I have applied the dialect when updating from scalafmt 2 to scalafmt 3 with various popular OSS Scala projects and there isn't any issues (in any case the Scala code that is used im Kafka is quite conservative) > I have verified that the style guide we follow for this project is not prescriptive about formatting of the docs. Hence, your change should be ok. Do you think it makes sense to format the docs or should I leave the PR as is? > Note that build is failing related to changes in this PR. We would probably need to fix the error but I think I might have already fixed it in MINOR: Catch InvocationTargetException explicitly and propagate underlying cause #12230 If that lands, then the check should pass here. In that case it's sensible to wait for your PR to land and then I can rebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
mdedetrich commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r936511846 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: The Scala 2.13 dialect is backwards compatible with 2.12. I have applied the dialect when updating from scalafmt 2 to scalafmt 3 with various popular OSS Scala projects and there isn't any issues (in any case the Scala code that is used im Kafka is quite conservative) > I have verified that the style guide we follow for this project is not prescriptive about formatting of the docs. Hence, your change should be ok. Do you think it makes sense to format the docs or should I leave the PR as os? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
mdedetrich commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r936511846 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: The Scala 2.13 dialect is backwards compatible with 2.12. I have applied the dialect when updating from scalafmt 2 to scalafmt 3 with various popular OSS Scala projects and there isn't any issues (in any case the Scala code that is used im Kafka is quite conservative) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12473: KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
divijvaidya commented on code in PR #12473: URL: https://github.com/apache/kafka/pull/12473#discussion_r936497559 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java: ## @@ -236,31 +198,23 @@ public void connectorStatus() { assertEquals(0, taskState.id()); assertEquals("UNASSIGNED", taskState.state()); assertEquals(workerId, taskState.workerId()); - -PowerMock.verifyAll(); } @Test public void taskStatus() { ConnectorTaskId taskId = new ConnectorTaskId("connector", 0); String workerId = "workerId"; -AbstractHerder herder = partialMockBuilder(AbstractHerder.class) -.withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class, - ConnectorClientConfigOverridePolicy.class) -.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) -.addMockedMethod("generation") -.createMock(); +AbstractHerder herder = mock(AbstractHerder.class, withSettings() +.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) +.defaultAnswer(CALLS_REAL_METHODS)); -EasyMock.expect(herder.generation()).andStubReturn(5); +when(herder.generation()).thenReturn(5); Review Comment: Removed in the latest revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12473: KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
divijvaidya commented on code in PR #12473: URL: https://github.com/apache/kafka/pull/12473#discussion_r936496903 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java: ## @@ -141,89 +141,51 @@ public class AbstractHerderTest { private final String connector = "connector"; private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); -@MockStrict private Worker worker; -@MockStrict private WorkerConfigTransformer transformer; -@MockStrict private ConfigBackingStore configStore; -@MockStrict private StatusBackingStore statusStore; -@MockStrict private ClassLoader classLoader; -@Mock private Plugins plugins; +private final Worker worker = mock(Worker.class); +private final WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class); +private final Plugins plugins = mock(Plugins.class); +private final ClassLoader classLoader = mock(ClassLoader.class); +private final ConfigBackingStore configStore = mock(ConfigBackingStore.class); +private final StatusBackingStore statusStore = mock(StatusBackingStore.class); +private ClassLoader loader; +private Connector insConnector; -@Test -public void testConnectors() { -AbstractHerder herder = partialMockBuilder(AbstractHerder.class) -.withConstructor( -Worker.class, -String.class, -String.class, -StatusBackingStore.class, -ConfigBackingStore.class, -ConnectorClientConfigOverridePolicy.class -) -.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) -.addMockedMethod("generation") -.createMock(); - -EasyMock.expect(herder.generation()).andStubReturn(generation); -EasyMock.expect(herder.rawConfig(connector)).andReturn(null); -EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT); -replayAll(); -assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors())); -PowerMock.verifyAll(); +@Before +public void before() { +loader = Utils.getContextOrKafkaClassLoader(); +} + +@After +public void tearDown() { +if (loader != null) Plugins.compareAndSwapLoaders(loader); } @Test -public void testConnectorStatus() { -ConnectorTaskId taskId = new ConnectorTaskId(connector, 0); -AbstractHerder herder = partialMockBuilder(AbstractHerder.class) -.withConstructor( -Worker.class, -String.class, -String.class, -StatusBackingStore.class, -ConfigBackingStore.class, -ConnectorClientConfigOverridePolicy.class -) -.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) -.addMockedMethod("generation") -.createMock(); - -EasyMock.expect(herder.generation()).andStubReturn(generation); -EasyMock.expect(herder.rawConfig(connector)).andReturn(null); -EasyMock.expect(statusStore.get(connector)) -.andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation)); -EasyMock.expect(statusStore.getAll(connector)) -.andReturn(Collections.singletonList( -new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation))); - -replayAll(); -ConnectorStateInfo csi = herder.connectorStatus(connector); -PowerMock.verifyAll(); +public void testConnectors() { +AbstractHerder herder = mock(AbstractHerder.class, withSettings() +.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) +.defaultAnswer(CALLS_REAL_METHODS)); + +when(configStore.snapshot()).thenReturn(SNAPSHOT); +assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors())); } @Test public void connectorStatus() { Review Comment: done in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574595#comment-17574595 ] Christo Lolov edited comment on KAFKA-14132 at 8/3/22 10:01 AM: Great, thank you, I will put your alias next to them in the description :) was (Author: christo_lolov): Great, thank you, I will put your alias next to the in the description :) > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # WorkerTaskTest > # ErrorReporterTest > # RetryWithToleranceOperatorTest > # WorkerErrantRecordReporterTest > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14132: -- Description: {color:#DE350B}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest # ErrorReporterTest # RetryWithToleranceOperatorTest # WorkerErrantRecordReporterTest # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) (owner: Christo) # StateManagerUtilTest (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest # ErrorReporterTest # RetryWithToleranceOperatorTest # WorkerErrantRecordReporterTest # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) # StateManagerUtilTest (streams) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # WorkerTaskTest > # ErrorReporterTest > # RetryWithToleranceOperatorTest > # WorkerErrantRecordReporterTest > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on pull request #12474: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
divijvaidya commented on PR #12474: URL: https://github.com/apache/kafka/pull/12474#issuecomment-1203704542 > #12473 got here first, closing this one :) Ah duplicated effort! Please assign the JIRA to yourself (or add a comment) if you beginning to work on something. I helps avoid the wasted effort. Meanwhile if you are looking for more tests to fix, please grab some from https://issues.apache.org/jira/browse/KAFKA-14132 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest
divijvaidya commented on code in PR #12472: URL: https://github.com/apache/kafka/pull/12472#discussion_r936433188 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java: ## @@ -60,15 +64,15 @@ public class WorkerConnectorTest extends EasyMockSupport { public ConnectorConfig connectorConfig; public MockConnectMetrics metrics; -@Mock Plugins plugins; -@Mock SourceConnector sourceConnector; -@Mock SinkConnector sinkConnector; -@Mock Connector connector; -@Mock CloseableConnectorContext ctx; -@Mock ConnectorStatus.Listener listener; -@Mock CloseableOffsetStorageReader offsetStorageReader; -@Mock ConnectorOffsetBackingStore offsetStore; -@Mock ClassLoader classLoader; +private final Plugins plugins = mock(Plugins.class); +private final SourceConnector sourceConnector = mock(SourceConnector.class); +private final SinkConnector sinkConnector = mock(SinkConnector.class); +private final CloseableConnectorContext ctx = mock(CloseableConnectorContext.class); +private final ConnectorStatus.Listener listener = mock(ConnectorStatus.Listener.class); +private final CloseableOffsetStorageReader offsetStorageReader = mock(CloseableOffsetStorageReader.class); +private final ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); +private final ClassLoader classLoader = mock(ClassLoader.class); +private Connector connector; Review Comment: ok, I don't have strong opinion on this one. We can keep it as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12230: MINOR: Catch InvocationTargetException explicitly and propagate underlying cause
divijvaidya commented on PR #12230: URL: https://github.com/apache/kafka/pull/12230#issuecomment-1203693391 @mimaison kindly review when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12475: MINOR; Update scalafmt to latest version
divijvaidya commented on code in PR #12475: URL: https://github.com/apache/kafka/pull/12475#discussion_r936426643 ## checkstyle/.scalafmt.conf: ## @@ -12,7 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -docstrings = JavaDoc +version = 3.5.8 +runner.dialect = scala213 Review Comment: We still support scala 2.12 in this project. By specifying the dialect as 2.13, would we fail where syntax for 2.12 is used? If yes, are there any such cases in the project? (The build should tell us that after the existing spotless check passes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # WorkerConnectorTest (connect) (owner: Yash) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # KStreamFlatTransformTest # KStreamFlatTransformValuesTest # KStreamPrintTest # KStreamRepartitionTest # MaterializedInternalTest # TransformerSupplierAdapterTest # KTableSuppressProcessorMetricsTest # KTableSuppressProcessorTest # ClientUtilsTest # HighAvailabilityStreamsPartitionAssignorTest # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest # StreamsPartitionAssignorTest # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # WorkerConnectorTest (connect) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # KStreamFlatTransformTest # KStreamFlatTransformValuesTest # KStreamPrintTest # KStreamRepartitionTest # MaterializedInternalTest # TransformerSupplierAdapterTest # KTableSuppressProcessorMetricsTest # KTableSuppressProcessorTest # ClientUtilsTest # HighAvailabilityStreamsPartitionAssignorTest # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest # StreamsPartitionAssignorTest # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL: https://issues.apache.org/jira/browse/KAFKA-14133 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have > put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here > rely solely on EasyMock.{color} > Unless stated in brackets the tests are in the streams module. > A list of tests which still require to be moved from EasyMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # WorkerConnectorTest
[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574604#comment-17574604 ] Christo Lolov commented on KAFKA-14133: --- I wanted to collect all tests which are remaining so we can start ticking them off one by one in the next weeks in order to draw this migration to an end. Feel free to pick any of the tests on the list if you want to push them to completion :) > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL: https://issues.apache.org/jira/browse/KAFKA-14133 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have > put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here > rely solely on EasyMock.{color} > Unless stated in brackets the tests are in the streams module. > A list of tests which still require to be moved from EasyMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # WorkerConnectorTest (connect) > # WorkerCoordinatorTest (connect) > # RootResourceTest (connect) > # ByteArrayProducerRecordEquals (connect) > # TopologyTest > # KStreamFlatTransformTest > # KStreamFlatTransformValuesTest > # KStreamPrintTest > # KStreamRepartitionTest > # MaterializedInternalTest > # TransformerSupplierAdapterTest > # KTableSuppressProcessorMetricsTest > # KTableSuppressProcessorTest > # ClientUtilsTest > # HighAvailabilityStreamsPartitionAssignorTest > # InternalTopicManagerTest > # ProcessorContextImplTest > # ProcessorStateManagerTest > # StandbyTaskTest > # StoreChangelogReaderTest > # StreamTaskTest > # StreamThreadTest > # StreamsAssignmentScaleTest > # StreamsPartitionAssignorTest > # StreamsRebalanceListenerTest > # TaskManagerTest > # TimestampedKeyValueStoreMaterializerTest > # WriteConsistencyVectorTest > # AssignmentTestUtils > # StreamsMetricsImplTest > # CachingInMemoryKeyValueStoreTest > # CachingInMemorySessionStoreTest > # CachingPersistentSessionStoreTest > # CachingPersistentWindowStoreTest > # ChangeLoggingKeyValueBytesStoreTest > # ChangeLoggingSessionBytesStoreTest > # ChangeLoggingTimestampedKeyValueBytesStoreTest > # ChangeLoggingTimestampedWindowBytesStoreTest > # ChangeLoggingWindowBytesStoreTest > # CompositeReadOnlyWindowStoreTest > # KeyValueStoreBuilderTest > # MeteredTimestampedWindowStoreTest > # RocksDBStoreTest > # StreamThreadStateStoreProviderTest > # TimeOrderedCachingPersistentWindowStoreTest > # TimeOrderedWindowStoreTest > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14132: -- Description: {color:#DE350B}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest # ErrorReporterTest # RetryWithToleranceOperatorTest # WorkerErrantRecordReporterTest # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) # StateManagerUtilTest (streams) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest # SourceTaskOffsetCommiterTest # WorkerMetricsGroupTest # WorkerSinkTaskTest # WorkerSinkTaskThreadedTest # WorkerTaskTest # ErrorReporterTest # RetryWithToleranceOperatorTest # WorkerErrantRecordReporterTest # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) # StateManagerUtilTest (streams) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # WorkerTaskTest > # ErrorReporterTest > # RetryWithToleranceOperatorTest > # WorkerErrantRecordReporterTest > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) > # StateManagerUtilTest (streams) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574595#comment-17574595 ] Christo Lolov commented on KAFKA-14132: --- Great, thank you, I will put your alias next to the in the description :) > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest > # SourceTaskOffsetCommiterTest > # WorkerMetricsGroupTest > # WorkerSinkTaskTest > # WorkerSinkTaskThreadedTest > # WorkerTaskTest > # ErrorReporterTest > # RetryWithToleranceOperatorTest > # WorkerErrantRecordReporterTest > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) > # StateManagerUtilTest (streams) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17574561#comment-17574561 ] Yash Mayya commented on KAFKA-14133: Hi [~christo_lolov], are you planning to work on migrating all of these tests to Mockito? Just a heads up to avoid duplicate work, I'd created https://issues.apache.org/jira/browse/KAFKA-14134 and [https://github.com/apache/kafka/pull/12472] for WorkerConnectorTest yesterday. > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL: https://issues.apache.org/jira/browse/KAFKA-14133 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have > put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here > rely solely on EasyMock.{color} > Unless stated in brackets the tests are in the streams module. > A list of tests which still require to be moved from EasyMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # WorkerConnectorTest (connect) > # WorkerCoordinatorTest (connect) > # RootResourceTest (connect) > # ByteArrayProducerRecordEquals (connect) > # TopologyTest > # KStreamFlatTransformTest > # KStreamFlatTransformValuesTest > # KStreamPrintTest > # KStreamRepartitionTest > # MaterializedInternalTest > # TransformerSupplierAdapterTest > # KTableSuppressProcessorMetricsTest > # KTableSuppressProcessorTest > # ClientUtilsTest > # HighAvailabilityStreamsPartitionAssignorTest > # InternalTopicManagerTest > # ProcessorContextImplTest > # ProcessorStateManagerTest > # StandbyTaskTest > # StoreChangelogReaderTest > # StreamTaskTest > # StreamThreadTest > # StreamsAssignmentScaleTest > # StreamsPartitionAssignorTest > # StreamsRebalanceListenerTest > # TaskManagerTest > # TimestampedKeyValueStoreMaterializerTest > # WriteConsistencyVectorTest > # AssignmentTestUtils > # StreamsMetricsImplTest > # CachingInMemoryKeyValueStoreTest > # CachingInMemorySessionStoreTest > # CachingPersistentSessionStoreTest > # CachingPersistentWindowStoreTest > # ChangeLoggingKeyValueBytesStoreTest > # ChangeLoggingSessionBytesStoreTest > # ChangeLoggingTimestampedKeyValueBytesStoreTest > # ChangeLoggingTimestampedWindowBytesStoreTest > # ChangeLoggingWindowBytesStoreTest > # CompositeReadOnlyWindowStoreTest > # KeyValueStoreBuilderTest > # MeteredTimestampedWindowStoreTest > # RocksDBStoreTest > # StreamThreadStateStoreProviderTest > # TimeOrderedCachingPersistentWindowStoreTest > # TimeOrderedWindowStoreTest > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest
yashmayya commented on PR #12472: URL: https://github.com/apache/kafka/pull/12472#issuecomment-1203578055 Thanks for the reviews @divijvaidya and @C0urante! I've incorporated your feedback, could you please take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org