[jira] [Commented] (KAFKA-9877) ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed (kafka.log.LogManager)
[ https://issues.apache.org/jira/browse/KAFKA-9877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376281#comment-17376281 ] Kiran commented on KAFKA-9877: -- I am seeing the same issue in kafka1.0 as well.. org.apache.kafka.common.errors.KafkaStorageException: Error while deleting segments for testtopic-0 in dir /kafka-logs Caused by: java.io.IOException: Delete of log .log.deleted failed. at kafka.log.LogSegment.delete(LogSegment.scala:496) at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596) at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) at kafka.log.Log.maybeHandleIOException(Log.scala:1669) at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595) at kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I have compaction enabled for topic with below config: segment.ms=100ms delete.retention.ms=100ms also, lot of below errros. ERROR Error while processing data for partition testtopic1-18 (kafka.server.ReplicaFetcherThread) org.apache.kafka.common.errors.KafkaStorageException: Replica 3 is in an offline log directory for partition testtopic-10 > ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed > (kafka.log.LogManager) > > > Key: KAFKA-9877 > URL: https://issues.apache.org/jira/browse/KAFKA-9877 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.1.1 > Environment: Redhat >Reporter: Hawking Du >Priority: Major > Attachments: server-125.log > > > There is a so confused problem around me long time. > Kafka server often stop exceptionally seems caused by log clean process. Here > are some of logs from server. Can anyone give me some ideas for fixing it. > {code:java} > [2020-04-04 02:07:57,410] INFO [GroupMetadataManager brokerId=5] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 02:07:57,410] INFO > [GroupMetadataManager brokerId=5] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 > 02:17:57,410] INFO [GroupMetadataManager brokerId=5] Removed 0 expired > offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 02:27:57,410] INFO > [GroupMetadataManager brokerId=5] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-04-04 > 02:30:22,272] INFO [ProducerStateManager partition=__consumer_offsets-35] > Writing producer snapshot at offset 741037 > (kafka.log.ProducerStateManager)[2020-04-04 02:30:22,274] INFO [Log > partition=__consumer_offsets-35, dir=/tmp/kafka-logs] Rolled new log segment > at offset 741037 in 3 ms. (kafka.log.Log)[2020-04-04 02:30:26,289] ERROR > Failed to clean up log for __consumer_offsets-35 in dir /tmp/kafka-logs due > to IOException > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-35/.log at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) at > sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at > java.nio.file.Files.move(Files.java:1395) at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:815) at > org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:224) at > kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:508) at > kafka.log.Log.asyncDeleteSegment(Log.scala:1962) at > kafka.log.Log.$anonfun$replaceSegments$6(Log.scala:2025) at > kafka.log.Log.$anonfun$replaceSegments$6$adapted(Log.scala:2020) at >
[GitHub] [kafka] ableegoldman commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
ableegoldman commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875272278 Merged to trunk. Thanks @showuon ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
ableegoldman merged pull request #10903: URL: https://github.com/apache/kafka/pull/10903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10986: KAFKA-12983: reset needsJoinPrepare flag before rejoining the group
ableegoldman opened a new pull request #10986: URL: https://github.com/apache/kafka/pull/10986 The `#onJoinPrepare` callback is not always invoked before a member (re)joins the group, but only once when it first enters the rebalance. This means that any updates or events that occur during the join phase can be lost in the internal state: for example, clearing the SubscriptionState (and thus the "ownedPartitions" that are used for cooperative rebalancing) after losing its memberId during a rebalance. We should reset the `needsJoinPrepare` flag inside the resetStateAndRejoin() method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665031448 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -218,15 +256,14 @@ private boolean allSubscriptionsEqual(Set allTopics, allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); // this consumer is potential maxQuota candidate since we're still under the number of expected members // with more than the minQuota partitions. Note, if the number of expected members with more than -// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers +// the minQuota partitions is 0, it means minQuota == maxQuota, and there are no potentially unfilled if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { -unfilledMembers.add(consumer); +potentiallyUnfilledMembersAtMinQuota.add(consumer); Review comment: This is part of fix #3 -- basically we have to handle these members separately, once they get up to `minQuota` they are only "potentially" unfilled. Once the last member allowed reaches `maxQuota`, all of these `minQuota` members are suddenly considered filled. cc @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13039) kafka 0.10.1 gradle build failed
[ https://issues.apache.org/jira/browse/KAFKA-13039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hantaoluo updated KAFKA-13039: -- Description: when i use gradle 3.0 or 3.5 build kafka it failed * What went wrong: A problem occurred evaluating root project 'kafka-0.10.1.0-src'. > Could not find method scoverage() for arguments > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on > project ':core' of type org.gradle.api.Project. was: when i use gradle 3.0 or 3.5 build kafka it failed * What went wrong: A problem occurred evaluating root project 'kafka-0.10.1.0-src'. > Could not find method scoverage() for arguments > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on > project ':core' of type org.gradle.api.Project. > kafka 0.10.1 gradle build failed > > > Key: KAFKA-13039 > URL: https://issues.apache.org/jira/browse/KAFKA-13039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: windows10 gradle3.5 or 3.7 >Reporter: hantaoluo >Priority: Major > > when i use gradle 3.0 or 3.5 build kafka it failed > > * What went wrong: > A problem occurred evaluating root project 'kafka-0.10.1.0-src'. > > Could not find method scoverage() for arguments > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on > project ':core' of type org.gradle.api.Project. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13039) kafka 0.10.1 gradle build failed
[ https://issues.apache.org/jira/browse/KAFKA-13039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hantaoluo updated KAFKA-13039: -- Priority: Major (was: Minor) > kafka 0.10.1 gradle build failed > > > Key: KAFKA-13039 > URL: https://issues.apache.org/jira/browse/KAFKA-13039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.0 > Environment: windows10 gradle3.5 or 3.7 >Reporter: hantaoluo >Priority: Major > > when i use gradle 3.0 or 3.5 build kafka it failed > > * What went wrong: > A problem occurred evaluating root project 'kafka-0.10.1.0-src'. > > Could not find method scoverage() for arguments > > [build_280jc8bwyhuf6s4pit1n8ckjs$_run_closure30$_closure86@799527c6] on > > project ':core' of type org.gradle.api.Project. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665030959 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -186,16 +212,25 @@ private boolean allSubscriptionsEqual(Set allTopics, consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota; List assignedPartitions = new ArrayList<>(); -// Reassign previously owned partitions to the expected number +// Reassign previously owned partitions, up to the expected number of partitions per consumer for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); List consumerAssignment = assignment.get(consumer); +for (TopicPartition doublyClaimedPartition : partitionsWithMultiplePreviousOwners) { +if (ownedPartitions.contains(doublyClaimedPartition)) { +log.warn("Found partition {} still claimed as owned by consumer {}, despite being claimed by multiple" Review comment: Strictly speaking this should never ever happen, even if we do get these "impossible" doubly-claimed partitions, we're also removing them from the `ownedPartitions` above (that's fix #2). But I put in a safeguard just in case, it shouldn't hurt (performance-wise we should generally not even enter this loop since `partitionsWithMultiplePreviousOwners` should almost always be empty -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wuYin commented on pull request #10977: MINOR: Reuse hasDefault instead of comparing with NO_DEFAULT_VALUE directly
wuYin commented on pull request #10977: URL: https://github.com/apache/kafka/pull/10977#issuecomment-875267298 @showuon Thank you for patient explanation, I added this. Now `NO_DEFAULT_VALUE` is no longer used for direct comparison. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665030173 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -130,19 +146,26 @@ private boolean allSubscriptionsEqual(Set allTopics, for (final TopicPartition tp : memberData.partitions) { // filter out any topics that no longer exist or aren't part of the current subscription if (allTopics.contains(tp.topic())) { -ownedPartitions.add(tp); + +if (!allPreviousPartitionsToOwner.containsKey(tp)) { +allPreviousPartitionsToOwner.put(tp, consumer); +ownedPartitions.add(tp); +} else { +String otherConsumer = allPreviousPartitionsToOwner.get(tp); +log.warn("Found multiple consumers {} and {} claiming the same TopicPartition {} in the " Review comment: This is fix #2 -- if we somehow still get multiple consumers claiming a partition in the same generation, we have to consider both invalid and remove it from their `ownedPartitions` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment b
ableegoldman commented on a change in pull request #10985: URL: https://github.com/apache/kafka/pull/10985#discussion_r665029950 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -122,6 +131,13 @@ private boolean allSubscriptionsEqual(Set allTopics, // If the current member's generation is higher, all the previously owned partitions are invalid if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); + +allPreviousPartitionsToOwner.clear(); +partitionsWithMultiplePreviousOwners.clear(); +for (String droppedOutConsumer : membersWithOldGeneration) { + consumerToOwnedPartitions.get(droppedOutConsumer).clear(); +} Review comment: This part I just moved here to keep things up to date as we go, before we were clearing them after the loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug
ableegoldman commented on pull request #10985: URL: https://github.com/apache/kafka/pull/10985#issuecomment-875265727 ready for review @dajac @guozhangwang @hachikuji @showuon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10985: KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug
ableegoldman opened a new pull request #10985: URL: https://github.com/apache/kafka/pull/10985 The primary goal of this PR is to address the problem we've seen in the wild in which the ConsumerCoordinator fails to update its SubscriptionState and ultimately feeds invalid `ownedPartitions` data as input to the assignor. Previously the assignor would detect that something was wrong and just throw an exception, now we make several efforts to detect this earlier in the assignment process and then fix it if possible, and work around it if not. Specifically, this PR does a few things: 1) Bring the `generation` field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the `generation` field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the `ownedPartitions` of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or _might_ -- require more partitions to hit their quota. The problem was that members at the `minQuota` were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the `maxQuota`, meaning those `minQuota` members could/should not accept any more partitions beyond that. I believe this was introduced in [#10509](https://github.com/apache/kafka/pull/10509), so it shouldn't be in any released versions and does not need to be backported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
showuon commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875264149 @ableegoldman , failed tests are unrelated. Thank you. ``` Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10983: mention IdentityReplicationPolicy in ops docs
showuon commented on a change in pull request #10983: URL: https://github.com/apache/kafka/pull/10983#discussion_r665019951 ## File path: docs/ops.html ## @@ -845,6 +845,11 @@ https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java;>IdentityReplicationPolicy (since version 3.0.0), which does not rename topics. This is useful for simple one-way replication topologies, where topic renaming is not strictly necessary; however, be careful not to introduce cycles when using IdentityReplicationPolicy, since this can result in the same records being replicated in an infinite loop. Review comment: I'm not sure if we really need to put the source code link here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376189#comment-17376189 ] Luke Chen edited comment on KAFKA-13008 at 7/7/21, 3:30 AM: Actually, I'm not very familiar with detailed incremental session, too. But from the , [KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]: we can see the old session will be evict only when matching 1 of the following 3 condition: !image-2021-07-07-11-19-55-630.png|width=762,height=161! And because in the step 5, all above 3 conditions won't match, the new session won't evict the old session. Also, during that time, the old session already contain the "up-to-date" partition info of the partition A-1 (because partition A-1 was assigned to this session), so no partition A-1 update will be received. This is my understanding. Please correct me if I'm wrong. Thank you. was (Author: showuon): Actually, I'm not very familiar with detailed incremental session, too. But from the , [KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]: we can see the old session will be evict only when matching 1 of the following 3 condition: !image-2021-07-07-11-19-55-630.png|width=762,height=161! And because in the step 5, all above 3 conditions won't match, the new session won't evict the old session. Also, during that time, the old session already contain the "up-to-date" partition info of the partition A-1, so no partition A-1 update will be received. This is my understanding. Please correct me if I'm wrong. Thank you. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376189#comment-17376189 ] Luke Chen commented on KAFKA-13008: --- Actually, I'm not very familiar with detailed incremental session, too. But from the , [KIP-227|https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability]: we can see the old session will be evict only when matching 1 of the following 3 condition: !image-2021-07-07-11-19-55-630.png|width=762,height=161! And because in the step 5, all above 3 conditions won't match, the new session won't evict the old session. Also, during that time, the old session already contain the "up-to-date" partition info of the partition A-1, so no partition A-1 update will be received. This is my understanding. Please correct me if I'm wrong. Thank you. > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13008: -- Attachment: image-2021-07-07-11-19-55-630.png > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding opened a new pull request #10984: MINOR: fix typoe in LogCleanerTest.scala
ccding opened a new pull request #10984: URL: https://github.com/apache/kafka/pull/10984 `are rename to` -> `are renamed to` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
kamalcph commented on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-875230369 > Hi @kamalcph , what's the status of this PR? I noticed there are some unresolved code review comments, and there are some conflicts. I'll address the pending review comments and resolve the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10977: MINOR: Reuse hasDefault instead of comparing with NO_DEFAULT_VALUE directly
showuon commented on pull request #10977: URL: https://github.com/apache/kafka/pull/10977#issuecomment-875220116 @wuYin > This small change is mainly to unify the way to determine if the user has set the default value Yes, I know, so I think this line should also be updated. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L1146 ``` NO_DEFAULT_VALUE.equals(defaultValue) ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); ``` It is also trying to determine if the default value is set, so, we can update to: ``` hasDefault() ? parseType(name, defaultValue, type) : NO_DEFAULT_VALUE; ``` What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
showuon commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875214820 @ableegoldman , I've added a test in `ConsumerConfigTest` to verify the default partition assignor. We cannot verify the assignor via `KafkaConsumer` or `ConsumerCoordinator` unless we add/update methods for testing. I think that's unnecessary. Let me know if you have other opinions. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
dengziming commented on pull request #9769: URL: https://github.com/apache/kafka/pull/9769#issuecomment-875209647 Replace deprecated methods and rebase on trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r664983511 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -51,6 +58,7 @@ */ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); +private static final long INITIALIZATION_RETRY_INTERVAL_MS = 3L; Review comment: I prefer adding L for longs, which I missed at other declaration. afaik, that does not cause any issues as it gets automatically converted via a widening conversion to a `long`. The compiler takes care of not allowing numbers that may get truncated from `int` to `long` widening. Thanks for catching it, I will make it consistent by adding it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r664980913 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -220,16 +227,16 @@ public boolean isPartitionAssigned(int partition) { return assignedMetaPartitions.contains(partition); } -private void ensureNotClosed() { -if (closing) { -throw new IllegalStateException("This instance is already closed"); -} -} - public void close() { if (!closing) { -closing = true; -consumer.wakeup(); +synchronized (assignPartitionsLock) { +// Closing should be updated only after acquiring the lock to avoid race in +// maybeWaitForPartitionsAssignment() where it waits on assignPartitionsLock. It should not wait +// if the closing is already set. +closing = true; Review comment: `close()` will not be called concurrently here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r664976317 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 100; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +// It indicates whether the closing process has been started or not. If it is set as true, +// consumer will stop consuming messages and it will not allow partition assignments to be updated. +private volatile boolean closing = false; +// It indicates whether the consumer needs to assign the partitions or not. This is set when it is +// determined that the consumer needs to be assigned with the updated partitions. +private volatile boolean assignPartitions = false; + +private final Object assignPartitionsLock = new Object(); + +// Remote log metadata topic partitions that consumer is assigned to. +private volatile Set assignedMetaPartitions = Collections.emptySet(); + +// User topic partitions that this broker is a leader/follower for. +private Set assignedTopicPartitions = Collections.emptySet(); + +// Map of remote log metadata
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r664970217 ## File path: storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; + +/** + * This interface defines the lifecycle methods for {@code RemoteLogSegmentMetadata}. {@link RemoteLogSegmentLifecycleTest} tests + * different implementations of this interface. This is responsible for managing all the segments for a given {@code topicIdPartition} + * registered with {@link #initialize(TopicIdPartition)}. + */ +public interface RemoteLogSegmentLifecycleManager extends Closeable { Review comment: `RemoteLogSegmentLifecycleManager` is already under tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
showuon commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875185428 Make sense! Will add later. Thanks for the suggestion :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
ableegoldman commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875181165 > So, in this PR, I've added 1 test to test it: testMultiConsumerDefaultAssignorAndVerifyAssignment Thanks Luke, but I actually meant something simpler (and stupider) that directly verifies that the chosen assignor is the RangeAssignor, rather than inferring it from the assignment that's produced. Like something in ConsumerConfigTest, or KafkaConsumerTest. Does that make sense? I just think we want to have both a test: a more complicated test that validates what we actually care about, the resulting assignment, but also a plain dumb test that we can be 100% sure will break if/when the default assignor is changed, no matter what it gets changed to (ie even if the new assignor happens to produce the same assignment as the RangeAssignor in some cases) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10982: MINOR: Update dropwizard metrics to 4.1.12.1
rondagostino commented on pull request #10982: URL: https://github.com/apache/kafka/pull/10982#issuecomment-875181066 ``` SESSION REPORT (ALL TESTS) ducktape version: 0.8.1 session_id: 2021-07-06--001 run time: 177 minutes 4.952 seconds tests run:876 passed: 667 failed: 12 ignored: 197 ``` https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-07-06--001.1625614045--rondagostino--minor_dropwizard_metrics_4.1.12.1--362a8b250/report.html Failures appear unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376128#comment-17376128 ] Kirk True edited comment on KAFKA-12879 at 7/7/21, 12:24 AM: - Almost all of the {{Admin}} client's public API methods are meant to be consumed asynchronously. Most of the exceptions that can occur are thrown when the {{Future#get}} method is invoked, not when the {{Admin}} API call itself is invoked. However, there are some exceptions (pun intended): # Creation of the client propagates errors if any occur during configuration # {{close}} throws an error if the given {{timeout}} is negative # {{deleteTopics}} throws an {{IllegalArgumentException}} if the {{TopicCollection}} parameter is not of an expected sub-class # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the {{featureUpdates}} map is empty or contains a blank feature name The above are just those that are thrown directly in the {{KafkaAdminClient}} itself. Of the above list, item four seems like it stands out as being extra "picky" compared to, say, {{describeTransactions}}, which doesn't check the collection its given for emptiness. was (Author: kirktrue): Almost all of the {{Admin}} client's public API methods are meant to be consumed asynchronously. Most of the exceptions that can occur are thrown when the {{Future#get}} method is invoked. However, there are some exceptions (pun intended): # Creation of the client propagates errors if any occur during configuration # {{close}} throws an error if the given {{timeout}} is negative # {{deleteTopics}} throws an {{IllegalArgumentException}} if the {{TopicCollection}} parameter is not of an expected sub-class # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the {{featureUpdates}} map is empty or contains a blank feature name The above are just those that are thrown directly in the {{KafkaAdminClient}} itself. Of the above list, item four seems like it stands out as being extra "picky" compared to, say, {{describeTransactions}}, which doesn't check the collection its given for emptiness. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Kirk True >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376128#comment-17376128 ] Kirk True commented on KAFKA-12879: --- Almost all of the {{Admin}} client's public API methods are meant to be consumed asynchronously. Most of the exceptions that can occur are thrown when the {{Future#get}} method is invoked. However, there are some exceptions (pun intended): # Creation of the client propagates errors if any occur during configuration # {{close}} throws an error if the given {{timeout}} is negative # {{deleteTopics}} throws an {{IllegalArgumentException}} if the {{TopicCollection}} parameter is not of an expected sub-class # {{updateFeatures}} throws an {{IllegalArgumentException}} if the the {{featureUpdates}} map is empty or contains a blank feature name The above are just those that are thrown directly in the {{KafkaAdminClient}} itself. Of the above list, item four seems like it stands out as being extra "picky" compared to, say, {{describeTransactions}}, which doesn't check the collection its given for emptiness. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Kirk True >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
showuon commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875165545 @ableegoldman , no, there's no tests to verify that the RangeAssignor will be chosen by default when creating a Consumer. So, in this PR, I've added 1 test to test it: `testMultiConsumerDefaultAssignorAndVerifyAssignment`. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10903: KAFKA-13023: make "range, cooperative-sticky" as the default assignor in V3.0
showuon commented on pull request #10903: URL: https://github.com/apache/kafka/pull/10903#issuecomment-875157335 @ableegoldman , as discussed, we agreed this should be put in v3.0. Please take a look when available. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
cmccabe merged pull request #10931: URL: https://github.com/apache/kafka/pull/10931 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes
[ https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista updated KAFKA-13040: - Description: Raised for KIP-760 (linked). Many times, Kafka brokers in production crash with "Too many open files" error or "Out of memory" errors because some Kafka topics have a lot of segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These two configuration can be set by any user who is authorized to create topic or modify topic configuration. To prevent these two configuration from causing Kafka broker crash, they should have a minimum value that is big enough. was: Many times, Kafka brokers in production crash with "Too many open files" error or "Out of memory" errors because some Kafka topics have a lot of segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These two configuration can be set by any user who is authorized to create topic or modify topic configuration. To prevent these two configuration from causing Kafka broker crash, they should have a minimum value that is big enough. > Increase minimum value of segment.ms and segment.bytes > -- > > Key: KAFKA-13040 > URL: https://issues.apache.org/jira/browse/KAFKA-13040 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Badai Aqrandista >Priority: Minor > > Raised for KIP-760 (linked). > Many times, Kafka brokers in production crash with "Too many open files" > error or "Out of memory" errors because some Kafka topics have a lot of > segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These > two configuration can be set by any user who is authorized to create topic or > modify topic configuration. > To prevent these two configuration from causing Kafka broker crash, they > should have a minimum value that is big enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes
[ https://issues.apache.org/jira/browse/KAFKA-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista reassigned KAFKA-13040: Assignee: Badai Aqrandista > Increase minimum value of segment.ms and segment.bytes > -- > > Key: KAFKA-13040 > URL: https://issues.apache.org/jira/browse/KAFKA-13040 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Badai Aqrandista >Priority: Minor > > Many times, Kafka brokers in production crash with "Too many open files" > error or "Out of memory" errors because some Kafka topics have a lot of > segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These > two configuration can be set by any user who is authorized to create topic or > modify topic configuration. > To prevent these two configuration from causing Kafka broker crash, they > should have a minimum value that is big enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 edited a comment on pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 edited a comment on pull request #10962: URL: https://github.com/apache/kafka/pull/10962#issuecomment-875128420 > @skaundinya15 Thanks for the updates. Left comments about indexing of `groups` (starts from 0) in the tests with the change to collection in the last commit. Apart from that LGTM if tests pass. @rajinisivaram Just pushed changes for this to fix the failing tests - thanks for pointing it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13041) Support debugging system tests with ducker-ak
[ https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Vodetskyi updated KAFKA-13041: Summary: Support debugging system tests with ducker-ak (was: Support debugging system tests) > Support debugging system tests with ducker-ak > - > > Key: KAFKA-13041 > URL: https://issues.apache.org/jira/browse/KAFKA-13041 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Stanislav Vodetskyi >Priority: Major > > Currently when you're using ducker-ak to run system tests locally -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13041) Support debugging system tests with ducker-ak
[ https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376104#comment-17376104 ] Stanislav Vodetskyi commented on KAFKA-13041: - https://github.com/apache/kafka/pull/10915 > Support debugging system tests with ducker-ak > - > > Key: KAFKA-13041 > URL: https://issues.apache.org/jira/browse/KAFKA-13041 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Stanislav Vodetskyi >Priority: Major > > Currently when you're using ducker-ak to run system tests locally, your only > debug option is to add print/log messages. > It should be possible to connect to a ducker-ak test with a remote debugger. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13041) Support debugging system tests with ducker-ak
[ https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Vodetskyi updated KAFKA-13041: Description: Currently when you're using ducker-ak to run system tests locally, your only debug option is to add print/log messages. It should be possible to connect to a ducker-ak test with a remote debugger. was:Currently when you're using ducker-ak to run system tests locally > Support debugging system tests with ducker-ak > - > > Key: KAFKA-13041 > URL: https://issues.apache.org/jira/browse/KAFKA-13041 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Stanislav Vodetskyi >Priority: Major > > Currently when you're using ducker-ak to run system tests locally, your only > debug option is to add print/log messages. > It should be possible to connect to a ducker-ak test with a remote debugger. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stan-confluent commented on pull request #10915: Enable connecting VS Code remote debugger
stan-confluent commented on pull request #10915: URL: https://github.com/apache/kafka/pull/10915#issuecomment-875132069 @omkreddy https://issues.apache.org/jira/browse/KAFKA-13041 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13041) Support debugging system tests
[ https://issues.apache.org/jira/browse/KAFKA-13041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Vodetskyi updated KAFKA-13041: Description: Currently when you're using ducker-ak to run system tests locally > Support debugging system tests > -- > > Key: KAFKA-13041 > URL: https://issues.apache.org/jira/browse/KAFKA-13041 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Stanislav Vodetskyi >Priority: Major > > Currently when you're using ducker-ak to run system tests locally -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13041) Support debugging system tests
Stanislav Vodetskyi created KAFKA-13041: --- Summary: Support debugging system tests Key: KAFKA-13041 URL: https://issues.apache.org/jira/browse/KAFKA-13041 Project: Kafka Issue Type: Improvement Components: system tests Reporter: Stanislav Vodetskyi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on pull request #10962: URL: https://github.com/apache/kafka/pull/10962#issuecomment-875128420 > @skaundinya15 Thanks for the updates. Left comments about indexing of `groups` (starts from 0) in the tests with the change to collection in the last commit. Apart from that LGTM if tests pass. Just pushed changes for this to fix the failing tests - thanks for pointing it out -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664918688 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,222 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's only one broker, so no need to lookup the group coordinator // without describe permission on the topic, we shouldn't be able to fetch offsets -val offsetFetchRequest = new requests.OffsetFetchRequest.Builder(group, false, null, false).build() +val offsetFetchRequest = createOffsetFetchRequestAllPartitions var offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.isEmpty) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +assertTrue(offsetFetchResponse.partitionDataMap(group).isEmpty) // now add describe permission on the topic and verify that the offset can be fetched addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, DESCRIBE, ALLOW)), topicResource) offsetFetchResponse = connectAndReceive[OffsetFetchResponse](offsetFetchRequest) -assertEquals(Errors.NONE, offsetFetchResponse.error) -assertTrue(offsetFetchResponse.responseData.containsKey(tp)) -assertEquals(offset, offsetFetchResponse.responseData.get(tp).offset) +assertEquals(Errors.NONE, offsetFetchResponse.groupLevelError(group)) +assertTrue(offsetFetchResponse.partitionDataMap(group).containsKey(tp)) +assertEquals(offset, offsetFetchResponse.partitionDataMap(group).get(tp).offset) + } + + @Test + def testOffsetFetchMultipleGroupsAuthorization(): Unit = { +val groups = (0 until 5).map(i => s"group$i") +val groupResources = groups.map(group => new ResourcePattern(GROUP, group, LITERAL)) + +val topic1 = "topic1" +val topic1List = singletonList(new TopicPartition(topic1, 0)) +val topicOneResource = new ResourcePattern(TOPIC, topic1, LITERAL) +val topic2 = "topic2" +val topic1And2List = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) +val topicTwoResource = new ResourcePattern(TOPIC, topic2, LITERAL) +val topic3 = "topic3" +val allTopicsList = util.Arrays.asList( + new TopicPartition(topic1, 0), + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1), + new TopicPartition(topic3, 0), + new TopicPartition(topic3, 1), + new TopicPartition(topic3, 2)) +val topicThreeResource = new ResourcePattern(TOPIC, topic3, LITERAL) + +// create group to partition map to build batched offsetFetch request +val groupToPartitionMap = new util.HashMap[String, util.List[TopicPartition]]() +groupToPartitionMap.put(groups(1), topic1List) Review comment: groups(0) to group(4) ## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ## @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData +import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util +import java.util.Collections.singletonList +import scala.jdk.CollectionConverters._ +import java.util.{Optional, Properties} + +class OffsetFetchRequestTest extends BaseRequestTest { + + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(3) + val metadata = "metadata" + val topic = "topic" + val groupId = "groupId" + val groups: Seq[String] = (0 until 5).map(i =>
[GitHub] [kafka] junrao commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
junrao commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r664866782 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 100; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +// It indicates whether the closing process has been started or not. If it is set as true, +// consumer will stop consuming messages and it will not allow partition assignments to be updated. +private volatile boolean closing = false; +// It indicates whether the consumer needs to assign the partitions or not. This is set when it is +// determined that the consumer needs to be assigned with the updated partitions. +private volatile boolean assignPartitions = false; + +private final Object assignPartitionsLock = new Object(); + +// Remote log metadata topic partitions that consumer is assigned to. +private volatile Set assignedMetaPartitions = Collections.emptySet(); + +// User topic partitions that this broker is a leader/follower for. +private Set assignedTopicPartitions = Collections.emptySet(); + +// Map of remote log metadata
[jira] [Commented] (KAFKA-13037) "Thread state is already PENDING_SHUTDOWN" log spam
[ https://issues.apache.org/jira/browse/KAFKA-13037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376066#comment-17376066 ] A. Sophie Blee-Goldman commented on KAFKA-13037: Hey [~gray.john], would you be interested in submitting a PR for this? I completely agree that logging this at INFO on every iteration is wildly inappropriate, I just didn't push it at the time since I figured someone would file a ticket if it was really bothering people. And here we are :) > "Thread state is already PENDING_SHUTDOWN" log spam > --- > > Key: KAFKA-13037 > URL: https://issues.apache.org/jira/browse/KAFKA-13037 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0, 2.7.1 >Reporter: John Gray >Priority: Major > > KAFKA-12462 introduced a > [change|https://github.com/apache/kafka/commit/4fe4cdc4a61cbac8e070a8b5514403235194015b#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R722] > that increased this "Thread state is already {}" logger to info instead of > debug. We are running into a problem with our streams apps when they hit an > unrecoverable exception that shuts down the streams thread, we get this log > printed about 50,000 times per second per thread. I am guessing it is once > per record we have queued up when the exception happens? We have temporarily > raised the StreamThread logger to WARN instead of INFO to avoid the spam, but > we do miss the other good logs we get on INFO in that class. Could this log > be reverted back to debug? Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664895401 ## File path: clients/src/main/resources/common/message/OffsetFetchRequest.json ## @@ -31,19 +31,33 @@ // Version 6 is the first flexible version. // // Version 7 is adding the require stable flag. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups at a time + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ -{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", +{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, -{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+", +{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name."}, - { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", + { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7", "about": "The partition indexes we would like to fetch offsets for." } ]}, +{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", Review comment: Sounds good, will change it. ## File path: clients/src/main/resources/common/message/OffsetFetchResponse.json ## @@ -30,30 +30,57 @@ // Version 6 is the first flexible version. // // Version 7 adds pending offset commit as new error response on partition level. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, -{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", +{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+", + { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7", "about": "The responses per partition", "fields": [ -{ "name": "PartitionIndex", "type": "int32", "versions": "0+", +{ "name": "PartitionIndex", "type": "int32", "versions": "0-7", "about": "The partition index." }, -{ "name": "CommittedOffset", "type": "int64", "versions": "0+", +{ "name": "CommittedOffset", "type": "int64", "versions": "0-7", "about": "The committed message offset." }, -{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1", +{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1", "ignorable": true, "about": "The leader epoch." }, -{ "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+", +{ "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7", "about": "The partition metadata." }, -{ "name": "ErrorCode", "type": "int16", "versions": "0+", +{ "name": "ErrorCode", "type": "int16", "versions": "0-7", "about": "The error code, or 0 if there was no error." } ]} ]}, -{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true, - "about": "The top-level error code, or 0 if there was no error." } +{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." }, +{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", Review comment: Sounds good, will change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine merged pull request #10981: Bump trunk to 3.1.0-SNAPSHOT
kkonstantine merged pull request #10981: URL: https://github.com/apache/kafka/pull/10981 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #10981: Bump trunk to 3.1.0-SNAPSHOT
kkonstantine commented on pull request #10981: URL: https://github.com/apache/kafka/pull/10981#issuecomment-875092481 The few failures across builds are not relevant to the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms
[ https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376037#comment-17376037 ] Badai Aqrandista commented on KAFKA-7760: - [~dulvinw] I have created [KIP-760|https://cwiki.apache.org/confluence/display/KAFKA/KIP-760%3A+Increase+minimum+value+of+segment.ms+and+segment.bytes] and created KAFKA-13040 for it. I will close this ticket to focus on the work in KIP-760. > Add broker configuration to set minimum value for segment.bytes and segment.ms > -- > > Key: KAFKA-7760 > URL: https://issues.apache.org/jira/browse/KAFKA-7760 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Dulvin Witharane >Priority: Major > Labels: kip, newbie > > If someone set segment.bytes or segment.ms at topic level to a very small > value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a > very high number of segment files. This can bring down the whole broker due > to hitting the maximum open file (for log) or maximum number of mmap-ed file > (for index). > To prevent that from happening, I would like to suggest adding two new items > to the broker configuration: > * min.topic.segment.bytes, defaults to 1048576: The minimum value for > segment.bytes. When someone sets topic configuration segment.bytes to a value > lower than this, Kafka throws an error INVALID VALUE. > * min.topic.segment.ms, defaults to 360: The minimum value for > segment.ms. When someone sets topic configuration segment.ms to a value lower > than this, Kafka throws an error INVALID VALUE. > Thanks > Badai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-7760) Add broker configuration to set minimum value for segment.bytes and segment.ms
[ https://issues.apache.org/jira/browse/KAFKA-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista resolved KAFKA-7760. - Resolution: Duplicate > Add broker configuration to set minimum value for segment.bytes and segment.ms > -- > > Key: KAFKA-7760 > URL: https://issues.apache.org/jira/browse/KAFKA-7760 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Assignee: Dulvin Witharane >Priority: Major > Labels: kip, newbie > > If someone set segment.bytes or segment.ms at topic level to a very small > value (e.g. segment.bytes=1000 or segment.ms=1000), Kafka will generate a > very high number of segment files. This can bring down the whole broker due > to hitting the maximum open file (for log) or maximum number of mmap-ed file > (for index). > To prevent that from happening, I would like to suggest adding two new items > to the broker configuration: > * min.topic.segment.bytes, defaults to 1048576: The minimum value for > segment.bytes. When someone sets topic configuration segment.bytes to a value > lower than this, Kafka throws an error INVALID VALUE. > * min.topic.segment.ms, defaults to 360: The minimum value for > segment.ms. When someone sets topic configuration segment.ms to a value lower > than this, Kafka throws an error INVALID VALUE. > Thanks > Badai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe edited a comment on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
cmccabe edited a comment on pull request #10864: URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636 Merged to trunk and 3.0. Thanks, @mumrah. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
cmccabe closed pull request #10864: URL: https://github.com/apache/kafka/pull/10864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
cmccabe commented on pull request #10864: URL: https://github.com/apache/kafka/pull/10864#issuecomment-875089636 Merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664880346 ## File path: clients/src/main/resources/common/message/OffsetFetchResponse.json ## @@ -30,30 +30,57 @@ // Version 6 is the first flexible version. // // Version 7 adds pending offset commit as new error response on partition level. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, -{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", +{ "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+", + { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7", "about": "The responses per partition", "fields": [ -{ "name": "PartitionIndex", "type": "int32", "versions": "0+", +{ "name": "PartitionIndex", "type": "int32", "versions": "0-7", "about": "The partition index." }, -{ "name": "CommittedOffset", "type": "int64", "versions": "0+", +{ "name": "CommittedOffset", "type": "int64", "versions": "0-7", "about": "The committed message offset." }, -{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5+", "default": "-1", +{ "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1", "ignorable": true, "about": "The leader epoch." }, -{ "name": "Metadata", "type": "string", "versions": "0+", "nullableVersions": "0+", +{ "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7", "about": "The partition metadata." }, -{ "name": "ErrorCode", "type": "int16", "versions": "0+", +{ "name": "ErrorCode", "type": "int16", "versions": "0-7", "about": "The error code, or 0 if there was no error." } ]} ]}, -{ "name": "ErrorCode", "type": "int16", "versions": "2+", "default": "0", "ignorable": true, - "about": "The top-level error code, or 0 if there was no error." } +{ "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true, + "about": "The top-level error code, or 0 if there was no error." }, +{"name": "GroupIds", "type": "[]OffsetFetchResponseGroup", "versions": "8+", Review comment: As with the response, should we call this `Groups` rather than `GroupIds`? ## File path: clients/src/main/resources/common/message/OffsetFetchRequest.json ## @@ -31,19 +31,33 @@ // Version 6 is the first flexible version. // // Version 7 is adding the require stable flag. - "validVersions": "0-7", + // + // Version 8 is adding support for fetching offsets for multiple groups at a time + "validVersions": "0-8", "flexibleVersions": "6+", "fields": [ -{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", +{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, -{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+", +{ "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName", "about": "The topic name."}, - { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+", + { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7", "about": "The partition indexes we would like to fetch offsets for." } ]}, +{ "name": "GroupIds", "type": "[]OffsetFetchRequestGroup", "versions": "8+", Review comment: Should we call this `Groups` rather than `GroupIds` since it is not just the group id? ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -1358,17 +1367,233 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // note there's
[jira] [Created] (KAFKA-13040) Increase minimum value of segment.ms and segment.bytes
Badai Aqrandista created KAFKA-13040: Summary: Increase minimum value of segment.ms and segment.bytes Key: KAFKA-13040 URL: https://issues.apache.org/jira/browse/KAFKA-13040 Project: Kafka Issue Type: Improvement Reporter: Badai Aqrandista Many times, Kafka brokers in production crash with "Too many open files" error or "Out of memory" errors because some Kafka topics have a lot of segment files as a result of small {{segment.ms}} or {{segment.bytes}}. These two configuration can be set by any user who is authorized to create topic or modify topic configuration. To prevent these two configuration from causing Kafka broker crash, they should have a minimum value that is big enough. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13000) Improve handling of UnsupportedVersionException in MockClient
[ https://issues.apache.org/jira/browse/KAFKA-13000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376022#comment-17376022 ] Kirk True commented on KAFKA-13000: --- [~mimaison] - is this ticket still valid? Looking at the code in {{trunk}}, I don't see anywhere in {{MockClient}} where an {{UnsupportedVersionException}} is thrown. I've done some related work in [KAFKA-12989|https://issues.apache.org/jira/browse/KAFKA-12989], so I'm hoping to work on this too, if it's still an issue. Thanks! > Improve handling of UnsupportedVersionException in MockClient > - > > Key: KAFKA-13000 > URL: https://issues.apache.org/jira/browse/KAFKA-13000 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Kirk True >Priority: Major > > MockClient handles UnsupportedVersionException slightly differently than > NetworkClient. In some cases, it may throw this exception while instead it > should return always return a ClientResponse. > Background: https://github.com/apache/kafka/pull/10743#discussion_r655922760 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664873858 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: You can probably set the list in `OffsetFetchRequestData` directly in the test. But let's leave that for a follow-on PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13000) Improve handling of UnsupportedVersionException in MockClient
[ https://issues.apache.org/jira/browse/KAFKA-13000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-13000: - Assignee: Kirk True > Improve handling of UnsupportedVersionException in MockClient > - > > Key: KAFKA-13000 > URL: https://issues.apache.org/jira/browse/KAFKA-13000 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Kirk True >Priority: Major > > MockClient handles UnsupportedVersionException slightly differently than > NetworkClient. In some cases, it may throw this exception while instead it > should return always return a ClientResponse. > Background: https://github.com/apache/kafka/pull/10743#discussion_r655922760 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r664870271 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ## @@ -316,6 +316,11 @@ public void flush() { lastFlushedOffset = endOffset().offset; } +@Override +public boolean maybeClean() { +return false; +} Review comment: I'm going to address this further in a follow-up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17376019#comment-17376019 ] A. Sophie Blee-Goldman commented on KAFKA-13008: Nice find! I agree, this does not seem like the expected behavior, and given that it's been causing a test to fail regularly I think we can assert that this should not happen. One thing I don't understand, and maybe this is because I don't have much context on the incremental fetch internals, is: why would we not get the metadata again after the partition is re-assigned in step 5? Surely if a partition was removed from the assignment and then added back, this should constitute a new 'session', and thus it should get the metadata again on assignment? If not that sounds like a bug in the incremental fetch to me, but again, I'm not too familiar with it so there could be a valid reason it works this way. If so, then maybe we should consider allowing metadata to remain around after a partition is unassigned, in case it gets this same partition back within the session? Could there be other consequences of this lack of metadata, outside of Streams? > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or
[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue commented on pull request #10980: URL: https://github.com/apache/kafka/pull/10980#issuecomment-875058699 @hachikuji - would you be willing to assign a reviewer for this PR? The failing tests look like they're related to KRaft tests, so I don't _think_ they're related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #10951: KAFKA-12841: NPE from the provided metadata in client callback in case of ApiException
kirktrue commented on pull request #10951: URL: https://github.com/apache/kafka/pull/10951#issuecomment-875058547 @hachikuji - would you be willing to assign a reviewer for this PR? The failing tests look like they're related to KRaft tests, so I don't _think_ they're related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #10943: Fix verification of version probing
ableegoldman edited a comment on pull request #10943: URL: https://github.com/apache/kafka/pull/10943#issuecomment-875052948 Oh nevermind, I see, I just missed bumping one instance of it in the message. In that case this LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10943: Fix verification of version probing
ableegoldman commented on pull request #10943: URL: https://github.com/apache/kafka/pull/10943#issuecomment-875052948 Oh nevermind, I see the problem, I just missed bumping one instance of it in the message. In that case this LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10943: Fix verification of version probing
ableegoldman commented on pull request #10943: URL: https://github.com/apache/kafka/pull/10943#issuecomment-875051446 Wait, I'm confused. I definitely bumped the version number in this test in that PR you referenced (scroll down to the end of the changes, it's the last file). Did we for some reason bump the protocol version number again since then? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-12644: Summary: Add Missing Class-Level Javadoc to Descendants of org.apache.kafka.common.errors.ApiException (was: Add Missing Class-Level Javadoc to Decendants of org.apache.kafka.common.errors.ApiException) > Add Missing Class-Level Javadoc to Descendants of > org.apache.kafka.common.errors.ApiException > - > > Key: KAFKA-12644 > URL: https://issues.apache.org/jira/browse/KAFKA-12644 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Affects Versions: 3.0.0, 2.8.1 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Blocker > Labels: documentation > Fix For: 3.0.0, 2.8.1 > > > I noticed that class-level Javadocs are missing from some classes in the > org.apache.kafka.common.errors package. This issue is for tracking the work > of adding the missing class-level javadocs for those Exception classes. > https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html > https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors > Basic class-level documentation could be derived by mapping the error > conditions documented in the protocol > https://kafka.apache.org/protocol#protocol_constants -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-12994: Priority: Blocker (was: Major) > Migrate all Tests to New API and Remove Suppression for Deprecation Warnings > related to KIP-633 > --- > > Key: KAFKA-12994 > URL: https://issues.apache.org/jira/browse/KAFKA-12994 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip, kip-633 > Fix For: 3.0.0 > > > Due to the API changes for KIP-633 a lot of deprecation warnings have been > generated in tests that are using the old deprecated APIs. There are a lot of > tests using the deprecated methods. We should absolutely migrate them all to > the new APIs and then get rid of all the applicable annotations for > suppressing the deprecation warnings. > The applies to all Java and Scala examples and tests using the deprecated > APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows > classes. > > This is based on the feedback from reviewers in this PR > > https://github.com/apache/kafka/pull/10926 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13021) Improve Javadocs for API Changes from KIP-633
[ https://issues.apache.org/jira/browse/KAFKA-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-13021: Priority: Blocker (was: Minor) > Improve Javadocs for API Changes from KIP-633 > - > > Key: KAFKA-13021 > URL: https://issues.apache.org/jira/browse/KAFKA-13021 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 3.0.0 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Blocker > > There are Javadoc changes from the following PR that needs to be completed > prior to the 3.0 release. This Jira item is to track that work > [https://github.com/apache/kafka/pull/10926] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10931: KAFKA-12998: Implement broker-side KRaft snapshots (WIP)
cmccabe commented on pull request #10931: URL: https://github.com/apache/kafka/pull/10931#issuecomment-875047031 Sorry, I forgot to push earlier. Also fixed an additional issue with default quotas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12644) Add Missing Class-Level Javadoc to Decendants of org.apache.kafka.common.errors.ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-12644: Priority: Blocker (was: Major) > Add Missing Class-Level Javadoc to Decendants of > org.apache.kafka.common.errors.ApiException > > > Key: KAFKA-12644 > URL: https://issues.apache.org/jira/browse/KAFKA-12644 > Project: Kafka > Issue Type: Improvement > Components: clients, documentation >Affects Versions: 3.0.0, 2.8.1 >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Blocker > Labels: documentation > Fix For: 3.0.0, 2.8.1 > > > I noticed that class-level Javadocs are missing from some classes in the > org.apache.kafka.common.errors package. This issue is for tracking the work > of adding the missing class-level javadocs for those Exception classes. > https://kafka.apache.org/27/javadoc/org/apache/kafka/common/errors/package-summary.html > https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/common/errors > Basic class-level documentation could be derived by mapping the error > conditions documented in the protocol > https://kafka.apache.org/protocol#protocol_constants -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12458) Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob Storage)
[ https://issues.apache.org/jira/browse/KAFKA-12458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17375986#comment-17375986 ] Israel Ekpo commented on KAFKA-12458: - This issue is not tied to any release since the code will be hosted outside of the Kafka Repo > Implementation of Tiered Storage Integration with Azure Storage (ADLS + Blob > Storage) > - > > Key: KAFKA-12458 > URL: https://issues.apache.org/jira/browse/KAFKA-12458 > Project: Kafka > Issue Type: Sub-task >Reporter: Israel Ekpo >Assignee: Israel Ekpo >Priority: Major > > Task to cover integration support for Azure Storage > * Azure Blob Storage > * Azure Data Lake Store > Will split task up later into distinct tracks and components -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ryannedolan opened a new pull request #10983: mention IdentityReplicationPolicy in ops docs
ryannedolan opened a new pull request #10983: URL: https://github.com/apache/kafka/pull/10983 IdentityReplicationPolicy was added to MM2 in 3.0.0, so we note it in the docs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10918: KAFKA-12756: Update ZooKeeper to v3.6.3
rondagostino commented on a change in pull request #10918: URL: https://github.com/apache/kafka/pull/10918#discussion_r664827028 ## File path: gradle/dependencies.gradle ## @@ -61,6 +61,7 @@ versions += [ bcpkix: "1.66", checkstyle: "8.36.2", commonsCli: "1.4", + dropwizardMetrics: "3.2.5", Review comment: https://github.com/apache/kafka/pull/10982 contains this change. Will attache system test results there when tests complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10982: MINOR: Update dropwizard metrics to 4.1.12.1
rondagostino opened a new pull request #10982: URL: https://github.com/apache/kafka/pull/10982 ZooKeeper was updated to v3.6.3 via https://github.com/apache/kafka/pull/10918. However, it was noted in that PR discussion (https://github.com/apache/kafka/pull/10918#discussion_r663412933) that the dropwizard metrics-core library has since been updated from v3.2.5 to v4.1.12.1 for 3.7.x releases (via https://github.com/apache/zookeeper/commit/13fe0d0ffb9fd2c379b9b430aaaf9ee75acfceba). Since there were no code changes associated with this library version bump in ZooKeeper, and since we wish to avoid potential CVEs if possible, we can consider updating to this newer version now assuming that system tests pass. I will attach system test results when they complete. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on pull request #10962: URL: https://github.com/apache/kafka/pull/10962#issuecomment-875021173 @rajinisivaram Thank you for the reviews, I have updated the PR addressing all your comments. Whenever you get a chance, the PR is ready for review - thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664819056 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: @rajinisivaram I'm not sure how we could test that case, as the `Builder` for the `OffsetFetchRequest` takes in a `Map>` which means all of the keys have to be unique. As a result if a group appears twice, the latest entry will take precedence. Not sure if this is how we want to handle it, but generally speaking I think we don't support having the same name group id appearing twice in the request - as in it's not possible to construct a request like that, at least not using the `Builder` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664807523 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Ah okay, good point. Let's do this in a follow up PR, since we are trying to get this feature into 3.0 right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine opened a new pull request #10981: Bump trunk to 3.1.0-SNAPSHOT
kkonstantine opened a new pull request #10981: URL: https://github.com/apache/kafka/pull/10981 Typical version bumps on `trunk` , after the creation of the `3.0` release branch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664807523 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Ah okay, good point. Let's do this in a follow up PR, since we are trying to get this feature into 3.0 right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue removed a comment on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue removed a comment on pull request #10980: URL: https://github.com/apache/kafka/pull/10980#issuecomment-874998780 Can someone from @confluentinc/clients review, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664806203 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Same test class `OffsetFetchRequestTest.scala`, a test with v8 with a batched request where the same group appears twice, perhaps with different topics. The response should be either InvalidRequestException because we want to treat it as an error OR actual offsets because we handle the request correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664805037 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); Review comment: Ah gotcha, makes sense. For now I have removed the `toSingleton()` helper method and collected it into a list that gets the first element instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664804113 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); Review comment: The helper method `toSingleton()` throws IllegalStateException if the list size is greater than one. If a request contains the same group twice, it can appear twice in the list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664803616 ## File path: core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala ## @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetAndMetadata} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{AbstractResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util +import java.util.Collections.singletonList +import scala.jdk.CollectionConverters._ +import java.util.{Optional, Properties} + +class OffsetFetchRequestTest extends BaseRequestTest{ + + override def brokerCount: Int = 1 + + val brokerId: Integer = 0 + val offset = 15L + val leaderEpoch: Optional[Integer] = Optional.of(3) + val metadata = "metadata" + + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) +properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") +properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") +properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") +properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") +properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") + } + + @BeforeEach + override def setUp(): Unit = { +doSetup(createOffsetsTopic = false) + +TestUtils.createOffsetsTopic(zkClient, servers) + } + + @Test + def testOffsetFetchRequestLessThanV8(): Unit = { +val topic = "topic" +createTopic(topic) + +val groupId = "groupId" +val tpList = singletonList(new TopicPartition(topic, 0)) +val topicOffsets = tpList.asScala.map{ + tp => (tp, new OffsetAndMetadata(offset, leaderEpoch, metadata)) +}.toMap.asJava + +consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) +val consumer = createConsumer() +consumer.assign(tpList) +consumer.commitSync(topicOffsets) +consumer.close() +// testing from version 1 onward since version 0 read offsets from ZK +for (version <- 1 to ApiKeys.OFFSET_FETCH.latestVersion()) { + if (version < 8) { +val request = + if (version < 7) { +new OffsetFetchRequest.Builder( + groupId, false, tpList, false) + .build(version.asInstanceOf[Short]) + } else { +new OffsetFetchRequest.Builder( + groupId, false, tpList, true) + .build(version.asInstanceOf[Short]) + } +val response = connectAndReceive[OffsetFetchResponse](request) +val topicData = response.data().topics().get(0) +val partitionData = topicData.partitions().get(0) +if (version < 3) { + assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()) +} +assertEquals(Errors.NONE, response.error()) +assertEquals(topic, topicData.name()) +assertEquals(0, partitionData.partitionIndex()) +assertEquals(offset, partitionData.committedOffset()) +if (version >= 5) { + // committed leader epoch introduced with V5 + assertEquals(leaderEpoch.get(), partitionData.committedLeaderEpoch()) +} +assertEquals(metadata, partitionData.metadata()) +assertEquals(Errors.NONE.code(), partitionData.errorCode()) + } +} + } + + @Test + def testOffsetFetchRequestV8AndAbove(): Unit = { +val groupOne = "group1" +val groupTwo = "group2" +val groupThree = "group3" +val groupFour = "group4" +val groupFive = "group5" Review comment: I refactored this test class a bit to make use of more helper methods - is there something additional we can do here to make it work on a collection of groups? Are you suggesting adding an additional method that can create some number of groups that we can use
[GitHub] [kafka] skaundinya15 commented on pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on pull request #10962: URL: https://github.com/apache/kafka/pull/10962#issuecomment-875002777 @kkonstantine This is the PR I am trying to get in time for 3.0, if you will allow for extensions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664800832 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Would this be a test for single group requests with v8? I have added that case to the existing single group tests for older versions in `OffsetFetchRequestTest.scala` and will push those changes shortly. Is there another type of test you were thinking of? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
skaundinya15 commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664798862 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } +public Builder(Map> groupIdToTopicPartitionMap, +boolean requireStable, +boolean throwOnFetchStableOffsetsUnsupported) { +super(ApiKeys.OFFSET_FETCH); + +List groups = new ArrayList<>(); +for (Entry> entry : groupIdToTopicPartitionMap.entrySet()) { +String groupName = entry.getKey(); +List tpList = entry.getValue(); +final List topics; +if (tpList != null) { +Map offsetFetchRequestTopicMap = +new HashMap<>(); +for (TopicPartition topicPartition : tpList) { +String topicName = topicPartition.topic(); +OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( +topicName, new OffsetFetchRequestTopics().setName(topicName)); + topic.partitionIndexes().add(topicPartition.partition()); +offsetFetchRequestTopicMap.put(topicName, topic); +} +topics = new ArrayList<>(offsetFetchRequestTopicMap.values()); +} else { +topics = ALL_TOPIC_PARTITIONS_BATCH; +} +groups.add(new OffsetFetchRequestGroup() +.setGroupId(groupName) +.setTopics(topics)); +} +this.data = new OffsetFetchRequestData() +.setGroupIds(groups) +.setRequireStable(requireStable); +this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; +} + @Override public OffsetFetchRequest build(short version) { if (isAllTopicPartitions() && version < 2) { throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + "v" + version + ", but we need v2 or newer to request all topic partitions."); } - +if (data.groupIds().size() > 1 && version < 8) { +throw new NoBatchedOffsetFetchRequestException("Broker does not support" ++ " batching groups for fetch offset request on version " + version); +} if (data.requireStable() && version < 7) { if (throwOnFetchStableOffsetsUnsupported) { throw new UnsupportedVersionException("Broker unexpectedly " + "doesn't support requireStable flag on version " + version); } else { log.trace("Fallback the requireStable flag to false as broker " + - "only supports OffsetFetchRequest version {}. Need " + - "v7 or newer to enable this feature", version); +"only supports OffsetFetchRequest version {}. Need " + +"v7 or newer to enable this feature", version); return new OffsetFetchRequest(data.setRequireStable(false), version); } } - +if (version < 8) { +OffsetFetchRequestData oldDataFormat = null; +if (!data.groupIds().isEmpty()) { +OffsetFetchRequestGroup group = data.groupIds().get(0); +String groupName = group.groupId(); +List topics = group.topics(); +List oldFormatTopics = null; +if (topics != null) { +oldFormatTopics = topics +.stream() +.map(t -> +new OffsetFetchRequestTopic() +.setName(t.name()) +.setPartitionIndexes(t.partitionIndexes())) +.collect(Collectors.toList()); +} +oldDataFormat = new OffsetFetchRequestData() +.setGroupId(groupName) +.setTopics(oldFormatTopics) +.setRequireStable(data.requireStable()); +} +return new OffsetFetchRequest(oldDataFormat == null ? data : oldDataFormat, version); +} +// version 8 but have used old format of request, convert to version 8 of request Review comment: Ah okay, yes that makes sense, will make the change. -- This is an automated message from the Apache Git
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664798710 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -174,6 +315,28 @@ public boolean isAllPartitions() { return data.topics() == ALL_TOPIC_PARTITIONS; } +public boolean isAllPartitionsForGroup(String groupId) { +OffsetFetchRequestGroup group = data +.groupIds() +.stream() +.filter(g -> g.groupId().equals(groupId)) +.collect(toSingleton()); +return group.topics() == ALL_TOPIC_PARTITIONS_BATCH; +} + +// Custom collector to filter a single element +private Collector toSingleton() { Review comment: Can we a add a test in the new `kafka.server.OffsetFetchRequestTest`? Can do that in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue commented on pull request #10980: URL: https://github.com/apache/kafka/pull/10980#issuecomment-874998780 Can someone from @confluentinc/clients review, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664796679 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1308,29 +1308,31 @@ private OffsetFetchResponseHandler() { @Override public void handle(OffsetFetchResponse response, RequestFuture> future) { -if (response.hasError()) { -Errors error = response.error(); -log.debug("Offset fetch failed: {}", error.message()); +Errors responseError = response.groupLevelError(rebalanceConfig.groupId); +if (responseError != Errors.NONE) { +log.debug("Offset fetch failed: {}", responseError.message()); -if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { +if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry -future.raise(error); -} else if (error == Errors.NOT_COORDINATOR) { +future.raise(responseError); +} else if (responseError == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry -markCoordinatorUnknown(error); -future.raise(error); -} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { +markCoordinatorUnknown(responseError); +future.raise(responseError); +} else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId)); } else { -future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); +future.raise(new KafkaException("Unexpected error in fetch offset response: " + responseError.message())); } return; } Set unauthorizedTopics = null; -Map offsets = new HashMap<>(response.responseData().size()); +Map responseData = Review comment: ok, let's leave as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #10979: WIP
kirktrue closed pull request #10979: URL: https://github.com/apache/kafka/pull/10979 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664795285 ## File path: clients/src/test/java/org/apache/kafka/common/requests/OffsetFetchRequestTest.java ## @@ -76,62 +73,169 @@ public void testConstructor() { } for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { -OffsetFetchRequest request = builder.build(version); -assertFalse(request.isAllPartitions()); -assertEquals(groupId, request.groupId()); -assertEquals(partitions, request.partitions()); - -OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); -assertEquals(Errors.NONE, response.error()); -assertFalse(response.hasError()); -assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), -"Incorrect error count for version " + version); - -if (version <= 1) { -assertEquals(expectedData, response.responseData()); +if (version < 8) { +builder = new OffsetFetchRequest.Builder( +group1, +false, +partitions, +false); +assertFalse(builder.isAllTopicPartitions()); +OffsetFetchRequest request = builder.build(version); +assertFalse(request.isAllPartitions()); +assertEquals(group1, request.groupId()); +assertEquals(partitions, request.partitions()); + +OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); +assertEquals(Errors.NONE, response.error()); +assertFalse(response.hasError()); +assertEquals(Collections.singletonMap(Errors.NONE, version <= (short) 1 ? 3 : 1), response.errorCounts(), +"Incorrect error count for version " + version); + +if (version <= 1) { +assertEquals(expectedData, response.responseDataV0ToV7()); +} + +if (version >= 3) { +assertEquals(throttleTimeMs, response.throttleTimeMs()); +} else { +assertEquals(DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); +} +} else { +builder = new Builder(Collections.singletonMap(group1, partitions), false, false); +OffsetFetchRequest request = builder.build(version); +Map> groupToPartitionMap = +request.groupIdsToPartitions(); +Map> groupToTopicMap = +request.groupIdsToTopics(); +assertFalse(request.isAllPartitionsForGroup(group1)); +assertTrue(groupToPartitionMap.containsKey(group1) && groupToTopicMap.containsKey( +group1)); +assertEquals(partitions, groupToPartitionMap.get(group1)); +OffsetFetchResponse response = request.getErrorResponse(throttleTimeMs, Errors.NONE); +assertEquals(Errors.NONE, response.groupLevelError(group1)); +assertFalse(response.groupHasError(group1)); +assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts(), +"Incorrect error count for version " + version); +assertEquals(throttleTimeMs, response.throttleTimeMs()); } +} +} + +@Test +public void testConstructorWithMultipleGroups() { +List topic1Partitions = Arrays.asList( +new TopicPartition(topicOne, partitionOne), +new TopicPartition(topicOne, partitionTwo)); +List topic2Partitions = Arrays.asList( +new TopicPartition(topicTwo, partitionOne), +new TopicPartition(topicTwo, partitionTwo)); +List topic3Partitions = Arrays.asList( +new TopicPartition(topicThree, partitionOne), +new TopicPartition(topicThree, partitionTwo)); +Map> groupToTp = new HashMap<>(); +groupToTp.put(group1, topic1Partitions); +groupToTp.put(group2, topic2Partitions); +groupToTp.put(group3, topic3Partitions); +groupToTp.put(group4, null); +groupToTp.put(group5, null); +int throttleTimeMs = 10; -if (version >= 3) { +for (short version : ApiKeys.OFFSET_FETCH.allVersions()) { +if (version >= 8) { +builder = new Builder(groupToTp, false, false); +OffsetFetchRequest request = builder.build(version); +Map> groupToPartitionMap = +request.groupIdsToPartitions(); +Map> groupToTopicMap = +request.groupIdsToTopics(); +assertEquals(groupToTp.keySet(),
[GitHub] [kafka] kirktrue opened a new pull request #10980: KAFKA-12989: MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
kirktrue opened a new pull request #10980: URL: https://github.com/apache/kafka/pull/10980 Handle the case where `matches` returns `false` and throw the `InvalidStateException` as stated by the JavaDoc. We need to guard against this unexpected runtime error in the `KafkaAdminClient`'s `sendEligibleCalls` method with a try/catch. Not 100% sure if that's kosher or not. Included a targeted unit test for this case. The remaining tests in `KafkaAdminTestClient` continue to pass. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #10962: KIP-709: Implement request/response for offsetFetch batching
rajinisivaram commented on a change in pull request #10962: URL: https://github.com/apache/kafka/pull/10962#discussion_r664793097 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ## @@ -78,26 +85,107 @@ boolean isAllTopicPartitions() { return this.data.topics() == ALL_TOPIC_PARTITIONS; } +public Builder(Map> groupIdToTopicPartitionMap, +boolean requireStable, +boolean throwOnFetchStableOffsetsUnsupported) { +super(ApiKeys.OFFSET_FETCH); + +List groups = new ArrayList<>(); +for (Entry> entry : groupIdToTopicPartitionMap.entrySet()) { +String groupName = entry.getKey(); +List tpList = entry.getValue(); +final List topics; +if (tpList != null) { +Map offsetFetchRequestTopicMap = +new HashMap<>(); +for (TopicPartition topicPartition : tpList) { +String topicName = topicPartition.topic(); +OffsetFetchRequestTopics topic = offsetFetchRequestTopicMap.getOrDefault( +topicName, new OffsetFetchRequestTopics().setName(topicName)); + topic.partitionIndexes().add(topicPartition.partition()); +offsetFetchRequestTopicMap.put(topicName, topic); +} +topics = new ArrayList<>(offsetFetchRequestTopicMap.values()); +} else { +topics = ALL_TOPIC_PARTITIONS_BATCH; +} +groups.add(new OffsetFetchRequestGroup() +.setGroupId(groupName) +.setTopics(topics)); +} +this.data = new OffsetFetchRequestData() +.setGroupIds(groups) +.setRequireStable(requireStable); +this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; +} + @Override public OffsetFetchRequest build(short version) { if (isAllTopicPartitions() && version < 2) { throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + "v" + version + ", but we need v2 or newer to request all topic partitions."); } - +if (data.groupIds().size() > 1 && version < 8) { +throw new NoBatchedOffsetFetchRequestException("Broker does not support" ++ " batching groups for fetch offset request on version " + version); +} if (data.requireStable() && version < 7) { if (throwOnFetchStableOffsetsUnsupported) { throw new UnsupportedVersionException("Broker unexpectedly " + "doesn't support requireStable flag on version " + version); } else { log.trace("Fallback the requireStable flag to false as broker " + - "only supports OffsetFetchRequest version {}. Need " + - "v7 or newer to enable this feature", version); +"only supports OffsetFetchRequest version {}. Need " + +"v7 or newer to enable this feature", version); return new OffsetFetchRequest(data.setRequireStable(false), version); } } - +if (version < 8) { +OffsetFetchRequestData oldDataFormat = null; +if (!data.groupIds().isEmpty()) { +OffsetFetchRequestGroup group = data.groupIds().get(0); +String groupName = group.groupId(); +List topics = group.topics(); +List oldFormatTopics = null; +if (topics != null) { +oldFormatTopics = topics +.stream() +.map(t -> +new OffsetFetchRequestTopic() +.setName(t.name()) +.setPartitionIndexes(t.partitionIndexes())) +.collect(Collectors.toList()); +} +oldDataFormat = new OffsetFetchRequestData() +.setGroupId(groupName) +.setTopics(oldFormatTopics) +.setRequireStable(data.requireStable()); +} +return new OffsetFetchRequest(oldDataFormat == null ? data : oldDataFormat, version); +} +// version 8 but have used old format of request, convert to version 8 of request Review comment: At the moment, the code seems to do: ``` if (version < 8) { do-conversion-if-necessary } //
[jira] [Assigned] (KAFKA-12989) MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse
[ https://issues.apache.org/jira/browse/KAFKA-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-12989: - Assignee: Kirk True > MockClient should respect the request matcher passed to > prepareUnsupportedVersionResponse > - > > Key: KAFKA-12989 > URL: https://issues.apache.org/jira/browse/KAFKA-12989 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: Kirk True >Priority: Major > > The {{MockClient}} offers the possibility to prepare an unsupported version > response with {{MockClient#prepareUnsupportedVersionResponse}}. That method > accepts a {{RequestMatcher}} but it is never applied. It should be or we > should remove the matcher from the method. > {code:java} > UnsupportedVersionException unsupportedVersionException = null; > if (futureResp.isUnsupportedRequest) { > unsupportedVersionException = new UnsupportedVersionException( > "Api " + request.apiKey() + " with version " + > version); > } else { > AbstractRequest abstractRequest = > request.requestBuilder().build(version); > if (!futureResp.requestMatcher.matches(abstractRequest)) > throw new IllegalStateException("Request matcher did not > match next-in-line request " > + abstractRequest + " with prepared response " + > futureResp.responseBody); > } > ClientResponse resp = new > ClientResponse(request.makeHeader(version), request.callback(), > request.destination(), > request.createdTimeMs(), time.milliseconds(), > futureResp.disconnected, > unsupportedVersionException, null, > futureResp.responseBody); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13035) Kafka Connect: Update documentation for POST /connectors/(string: name)/restart to include task Restart behavior
[ https://issues.apache.org/jira/browse/KAFKA-13035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-13035. --- Fix Version/s: 3.0.0 Reviewer: Randall Hauch Resolution: Fixed Merged to `trunk` in time for 3.0.0 > Kafka Connect: Update documentation for POST /connectors/(string: > name)/restart to include task Restart behavior > -- > > Key: KAFKA-13035 > URL: https://issues.apache.org/jira/browse/KAFKA-13035 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > Fix For: 3.0.0 > > > KAFKA-4793 updated the behavior of POST /connectors/(string: name)/restart > based on queryParameters onlyFailed and includeTasks based on > [KIP-475|https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks]. > We should update documentation to reflect this -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch merged pull request #10975: KAFKA-13035 updated documentation for connector restart REST API to …
rhauch merged pull request #10975: URL: https://github.com/apache/kafka/pull/10975 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org