[GitHub] [kafka] ItherNiT commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows
ItherNiT commented on pull request #6329: URL: https://github.com/apache/kafka/pull/6329#issuecomment-799172904 Any update when this will be integrated into kafka? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7870) Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was disconnected before the response was read.
[ https://issues.apache.org/jira/browse/KAFKA-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301470#comment-17301470 ] ShiminHuang commented on KAFKA-7870: We actually found this in version 2.4.1 as well > Error sending fetch request (sessionId=1578860481, epoch=INITIAL) to node 2: > java.io.IOException: Connection to 2 was disconnected before the response was > read. > > > Key: KAFKA-7870 > URL: https://issues.apache.org/jira/browse/KAFKA-7870 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Chakhsu Lau >Priority: Blocker > > We build a kafka cluster with 5 brokers. But one of brokers suddenly stopped > running during the run. And it happened twice in the same broker. Here is the > log and is this a bug in kafka ? > {code:java} > [2019-01-25 12:57:14,686] INFO [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error sending fetch request (sessionId=1578860481, > epoch=INITIAL) to node 2: java.io.IOException: Connection to 2 was > disconnected before the response was read. > (org.apache.kafka.clients.FetchSessionHandler) > [2019-01-25 12:57:14,687] WARN [ReplicaFetcher replicaId=3, leaderId=2, > fetcherId=0] Error in response for fetch request (type=FetchRequest, > replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={api-result-bi-heatmap-8=(offset=0, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-save-12=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), api-result-bi-heatmap-task-2=(offset=2, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-flow-39=(offset=1883206, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), __consumer_offsets-47=(offset=349437, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-track-6=(offset=1039889, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-task-17=(offset=0, logStartOffset=0, maxBytes=1048576, > currentLeaderEpoch=Optional[4]), __consumer_offsets-2=(offset=0, > logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[4]), > api-result-bi-heatmap-aggs-19=(offset=1255056, logStartOffset=0, > maxBytes=1048576, currentLeaderEpoch=Optional[4])}, > isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1578860481, > epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 2 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
dengziming created KAFKA-12465: -- Summary: Decide whether inconsistent cluster id error are fatal Key: KAFKA-12465 URL: https://issues.apache.org/jira/browse/KAFKA-12465 Project: Kafka Issue Type: Sub-task Reporter: dengziming Currently, we just log an error when an inconsistent cluster-id occurred. We should set a window during startup when these errors are fatal but after that window, we no longer treat them to be fatal. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot
dengziming commented on a change in pull request #10289: URL: https://github.com/apache/kafka/pull/10289#discussion_r594127628 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse( ); } -private boolean hasValidClusterId(FetchRequestData request) { +private boolean hasValidClusterId(ApiMessage request) { Review comment: It's a bit difficult to figure out how to add the window, we could not simply rely on a fixed configuration, I add a ticket to track this problem: https://issues.apache.org/jira/browse/KAFKA-12465. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
[ https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-12465: --- Description: Currently, we just log an error when an inconsistent cluster-id occurred. We should set a window during startup when these errors are fatal but after that window, we no longer treat them to be fatal. see https://github.com/apache/kafka/pull/10289#discussion_r592853088 (was: Currently, we just log an error when an inconsistent cluster-id occurred. We should set a window during startup when these errors are fatal but after that window, we no longer treat them to be fatal.) > Decide whether inconsistent cluster id error are fatal > -- > > Key: KAFKA-12465 > URL: https://issues.apache.org/jira/browse/KAFKA-12465 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Priority: Major > > Currently, we just log an error when an inconsistent cluster-id occurred. We > should set a window during startup when these errors are fatal but after that > window, we no longer treat them to be fatal. see > https://github.com/apache/kafka/pull/10289#discussion_r592853088 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594184162 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet Review comment: nit: We can remove specifying `Set[Int]`. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet Review comment: ditto. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +var nonExistBrokers: Set[Int] = Set.empty +try { +val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +nonExistBrokers = inputBrokers.diff(clusterBrokers) +inputBrokers +case None => clusterBrokers +} Review comment: nit: We usually avoid using mutable variable unless it is really necessary. In this case, I would rather return the `nonExistingBrokers` when the argument is processed. Something like this: ``` val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { case Some(brokerListStr) => val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet (inputBrokers, inputBrokers.diff(clusterBrokers) case None => (clusterBrokers, Set.empty) } ``` ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,31 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.descr
[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-798355603 Hello Kafka Committers, this contribution is part of a university course. We would appreciate any kind of feedback. This initial PR is purposefully a very simple one for us to get familiar with the process of PRs in this repo. We attempted running tests (by executing `./gradlew tests`), but reports were not generated. The test output did indicate a report was generated into the builds directory. Is this a known issue, or expected behaviour? We followed the Readme as stated in the contributing page (https://github.com/apache/kafka/blob/trunk/README.md). On the [PR guideline page](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes), it says we should update the status of https://issues.apache.org/jira/browse/KAFKA-12456 to submit a patch. It seems we do not have the rights to do this, is this right? Thanks for your time! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #10308: MINOR: Update year in NOTICE
mimaison merged pull request #10308: URL: https://github.com/apache/kafka/pull/10308 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #10317: KAFKA-10357: Add setup method to internal topics
cadonna opened a new pull request #10317: URL: https://github.com/apache/kafka/pull/10317 For KIP-698, we need a way to setup internal topics without validating them. This PR adds a setup method to the InternalTopicManager for that purpose. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10317: KAFKA-10357: Add setup method to internal topics
cadonna commented on a change in pull request #10317: URL: https://github.com/apache/kafka/pull/10317#discussion_r594287719 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -653,56 +798,22 @@ public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopi @Test public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() { final long retentionMs = 1000; -mockAdminClient.addTopic( Review comment: Just refactorings from here to the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10317: KAFKA-10357: Add setup method to internal topics
cadonna commented on pull request #10317: URL: https://github.com/apache/kafka/pull/10317#issuecomment-799376130 Call for review: @rodesai This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12464) Enhance constrained sticky Assign algorithm
[ https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12464: -- Description: In KAFKA-9987, we did a great improvement for the case when all consumers were subscribed to same set of topics. The algorithm contains 4 phases: # Reassign as many previously owned partitions as possible, up to the maxQuota # Fill remaining members up to minQuota # If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity # Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity Take an example for better understanding: *example:* Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... t1p9 Suppose, current assignment is: _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ Now, new consumer added: C2, so we'll do: # Reassign as many previously owned partitions as possible, up to the maxQuota After this phase, the assignment will be: (maxQuota will be 4) _C0: t1p0, t1p1, t1p2, t1p3_ _C1: t1p5, t1p6, t1p7, t1p8_ # Fill remaining members up to minQuota After this phase, the assignment will be: _C0: t1p0, t1p1, t1p2, t1p3_ _C1: t1p5, t1p6, t1p7, t1p8_ _C2: t1p4, t1p9_ # If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions from the over-full consumers at max capacity After this phase, the assignment will be: _C0: t1p0, t1p1, t1p2_ _C1: t1p5, t1p6, t1p7, t1p8_ _C2: t1p4, t1p9,_ _t1p3_ # Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we should just distribute one partition each to all consumers at min capacity As we can see, we need 3 phases to complete the assignment. But we can actually completed with 2 phases. Here's the updated algorithm: # Reassign as many previously owned partitions as possible, up to the maxQuota, and also considering the numMaxQuota by the remainder of (Partitions / Consumers) # Fill remaining members up to maxQuota if current maxQuotaMember < numMaxQuota, otherwise, to minQuota By considering the numMaxQuota, the original step 1 won't be too aggressive to assign too many partitions to consumers, and the step 2 won't be too conservative to assign not enough partitions to consumers, so that we don't need step 3 and step 4 to balance them. {{So, the updated Pseudo-code sketch of the algorithm:}} C_f := (P/N)_floor, the floor capacity C_c := (P/N)_ceil, the ceiling capacity *C_r := (P%N) the allowed number of members with C_c partitions assigned* *num_max_capacity_members := current number of members with C_c partitions assigned (default to 0)* members := the sorted set of all consumers partitions := the set of all partitions unassigned_partitions := the set of partitions not yet assigned, initialized to be all partitions unfilled_members := the set of consumers not yet at capacity, initialized to empty -max_capacity_members := the set of members with exactly C_c partitions assigned, initialized to empty- member.owned_partitions := the set of previously owned partitions encoded in the Subscription // Reassign as many previously owned partitions as possible, *by considering the num_max_capacity_members* for member : members remove any partitions that are no longer in the subscription from its owned partitions remove all owned_partitions if the generation is old if member.owned_partitions.size < C_f assign all owned partitions to member and remove from unassigned_partitions add member to unfilled_members -else if member.owned_partitions.size == C_f- -assign first C_f owned_partitions to member and remove from unassigned_partitions- else if member.owned_partitions.size >= C_c *&& num_max_capacity_members < C_r* *assign first C_c owned_partitions to member and remove from unassigned_partitions* *num_max_capacity_members++* a-dd member to max_capacity_members- *else* *assign first C_f owned_partitions to member and remove from unassigned_partitions* sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, t1_p0 (for data parallelism) sort unfilled_members by memberId (for determinism) // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f* for member : unfilled_members compute the remaining capacity as -C = C_f - num_assigned_partitions- if num_max_capacity_members < C_r: C = C_c - num_assigned_partitions num_max_capacity_members++ else C = C_f - num_assigned_partitions pop the first C partitions from unassig
[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart
[ https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301648#comment-17301648 ] Ravishankar S R commented on KAFKA-10582: - Hi, I'm facing same problem. I'm using kafka 2.6.1. In my case, kafka restart on one instance works if restart done within few minutes of new topic creation. However, if it takes long time to restart then restarting on one instance doesn't help and both instances needs to be restarted in parallel (stop both instance and start both instance). Please let me know if there is any workaround to overcome this problem. Best Regards, Ravi > Mirror Maker 2 not replicating new topics until restart > --- > > Key: KAFKA-10582 > URL: https://issues.apache.org/jira/browse/KAFKA-10582 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.1 > Environment: RHEL 7 Linux. >Reporter: Robert Martin >Priority: Minor > > We are using Mirror Maker 2 from the 2.5.1 release for replication on some > clusters. Replication is working as expected for existing topics. When we > create a new topic, however, Mirror Maker 2 creates the replicated topic as > expected but never starts replicating it. If we restart Mirror Maker 2 > within 2-3 minutes the topic starts replicating as expected. From > documentation we haveve seen it appears this should start replicating without > a restart based on the settings we have. > *Example:* > Create topic "mytesttopic" on source cluster > MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue > MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic" > Restart MirrorMaker 2 and now replication works for "mytesttopic" -> > "source.mytesttopic" > *Example config:* > name = source->target > group.id = source-to-target > clusters = source, target > source.bootstrap.servers = sourcehosts:9092 > target.bootstrap.servers = targethosts:9092 > source->target.enabled = true > source->target.topics = .* > target->source = false > target->source.topics = .* > replication.factor=3 > checkpoints.topic.replication.factor=3 > heartbeats.topic.replication.factor=3 > offset-syncs.topic.replication.factor=3 > offset.storage.replication.factor=3 > status.storage.replication.factor=3 > config.storage.replication.factor=3 > tasks.max = 16 > refresh.topics.enabled = true > sync.topic.configs.enabled = true > refresh.topics.interval.seconds = 300 > refresh.groups.interval.seconds = 300 > readahead.queue.capacity = 100 > emit.checkpoints.enabled = true > emit.checkpoints.interval.seconds = 5 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac opened a new pull request #10318: KAFKA-12330; FetchSessionCache may cause starvation for partitions when FetchResponse is full
dajac opened a new pull request #10318: URL: https://github.com/apache/kafka/pull/10318 The incremental FetchSessionCache sessions deprioritizes partitions where a response is returned. This may happen if log metadata such as log start offset, hwm, etc is returned, or if data for that partition is returned. When a fetch response fills to maxBytes, data may not be returned for partitions even if the fetch offset is lower than the fetch upper bound. However, the fetch response will still contain updates to metadata such as hwm if that metadata has changed. This can lead to degenerate behavior where a partition's hwm or log start offset is updated resulting in the next fetch being unnecessarily skipped for that partition. At first this appeared to be worse, as hwm updates occur frequently, but starvation should result in hwm movement becoming blocked, allowing a fetch to go through and then becoming unstuck. However, it'll still require one more fetch request than necessary to do so. Consumers may be affected more than replica fetchers, however they often remove partitions with fetched data from the next fetch request and this may be helping prevent starvation. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #10319: MINOR; Various code cleanups
dajac opened a new pull request #10319: URL: https://github.com/apache/kafka/pull/10319 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on pull request #10304: URL: https://github.com/apache/kafka/pull/10304#issuecomment-799498049 > @wenbingshen Thanks for the updates. I have left few more minor comments. Also, it seems that the build failed. Could you check it? Thank you for your review and suggestions. I have submitted the latest code, and the code has been tested and compiled successfully. Please help review it again, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301327#comment-17301327 ] Chris Egerton edited comment on KAFKA-12463 at 3/15/21, 3:26 PM: - Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky assignor. Updated the description to point to 2.4 instead of 2.3. RE clearness on the upgrade section in KIP-429–I didn't see a specific section for Connect, and both of the sections that were there ("Consumer" and "Streams") provided a different procedure than the one I proposed here. It seems like an implicit goal of both of them is to arrive at an end state where all consumers only provide the cooperative assignor in their list of supported assignors, instead of the cooperative assignor first and with the other, older assignor behind it. I'm wondering if the lack of that goal is why this different approach (which only requires one-step rolling as opposed to two) is viable here but not necessarily for other applications? was (Author: chrisegerton): Ah, thanks [~ableegoldman], I'd misread the javadocs for the cooperative sticky assignor. Updated the description to point to 2.4 instead of 2.3. RE clearness on the upgrade section in KIP-429–I didn't see a specific section for Connect, and both of the sections that were provided ("Consumer" and "Streams") provided a different procedure than the one I proposed here. It seems like an implicit goal of both of them is to arrive at an end state where all consumers only provide the cooperative assignor in their list of supported assignors, instead of the cooperative assignor first and with the other, older assignor behind it. I'm wondering if the lack of that goal is why this different approach (which only requires one-step rolling as opposed to two) is viable here but not necessarily for other applications? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssigno
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301706#comment-17301706 ] Chris Egerton commented on KAFKA-12463: --- cc [~rhauch] what do you think about this? > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by setting the {{partition.assignment.strategy}} property of > sink task consumers to the list > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or > connector-level override is present. > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only be applied to the Connect framework in an upcoming minor > release. This does not preclude users from following the steps outlined here > to improve sink connector behavior on existing clusters by modifying their > worker configs to use {{consumer.partition.assignment.strategy =}} > {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, > org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a > per-connector basis using the > {{consumer.override.partition.assignment.strategy}} property. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda merged pull request #10135: KAFKA-10348: Share client channel between forwarding and auto creation manager
abbccdda merged pull request #10135: URL: https://github.com/apache/kafka/pull/10135 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #10320: MINOR: revert stream logging level back to ERROR
abbccdda opened a new pull request #10320: URL: https://github.com/apache/kafka/pull/10320 An accidental change of logging level for streams from https://github.com/apache/kafka/pull/9579, correcting it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
dajac commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594476389 ## File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala ## @@ -0,0 +1,52 @@ +package unit.kafka.admin Review comment: We must add the licence header here. You can copy it from another file. ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") Review comment: nit: Should we say `--broker-list` instead of `broker-list`? Also, should we say `broker(s)` instead of `node(s)` to be consistent with the message below? ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") +} else { +out.println("Querying brokers for log directories information") +val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava) Review comment: nit: `DescribeLogDirsResult` can be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594482210 ## File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala ## @@ -0,0 +1,52 @@ +package unit.kafka.admin Review comment: Sorry, after checking the compilation report, I have realized this problem and I have made changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594483345 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") Review comment: Good idea.I will act right away. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on a change in pull request #10304: URL: https://github.com/apache/kafka/pull/10304#discussion_r594484169 ## File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala ## @@ -39,19 +39,29 @@ object LogDirsCommand { def describe(args: Array[String], out: PrintStream): Unit = { val opts = new LogDirsCommandOptions(args) val adminClient = createAdminClient(opts) -val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) -val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { -case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) -case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray -} +val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) +try { +val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet +val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { +case Some(brokerListStr) => +val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet +(inputBrokers, inputBrokers.diff(clusterBrokers)) +case None => (clusterBrokers, Set.empty) +} -out.println("Querying brokers for log directories information") -val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) -val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } +if (nonExistingBrokers.nonEmpty) { +out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") +} else { +out.println("Querying brokers for log directories information") +val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava) Review comment: Sorry, I forgot this, I will change it right away This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10232: KAFKA-12352: Make sure all rejoin group and reset state has a reason
guozhangwang merged pull request #10232: URL: https://github.com/apache/kafka/pull/10232 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot
jsancio commented on a change in pull request #10289: URL: https://github.com/apache/kafka/pull/10289#discussion_r594490953 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -939,12 +951,27 @@ private FetchResponseData buildEmptyFetchResponse( ); } -private boolean hasValidClusterId(FetchRequestData request) { +private boolean hasValidClusterId(ApiMessage request) { Review comment: We can implement that when handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
wenbingshen commented on pull request #10304: URL: https://github.com/apache/kafka/pull/10304#issuecomment-799585108 > @wenbingshen Thanks for the updates. Let few more minot comments. Thank you for your commonts.I submitted the latest code, please review it, thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
jsancio commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r594509680 ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -732,13 +760,17 @@ public int hashCode() { } private void appendAsLeader(Collection records, int epoch) { +appendAsLeader(records, epoch, log.endOffset().offset); +} + +private void appendAsLeader(Collection records, int epoch, long initialOffset) { log.appendAsLeader( -MemoryRecords.withRecords( -log.endOffset().offset, -CompressionType.NONE, -records.toArray(new SimpleRecord[records.size()]) -), -epoch +MemoryRecords.withRecords( +initialOffset, +CompressionType.NONE, +records.toArray(new SimpleRecord[records.size()]) +), +epoch Review comment: Indentation looks off. We indent 4 spaces. ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -413,22 +413,53 @@ final class KafkaMetadataLogTest { assertTrue(log.deleteBeforeSnapshot(snapshotId)) val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch) -assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType()) +assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType) assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) } @Test - def testValidateEpochUnknown(): Unit = { + def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = { +val offset = 10 +val numOfRecords = 5 + val log = buildMetadataLog(tempDir, mockTime) +log.updateHighWatermark(new LogOffsetMetadata(offset)) +val snapshotId = new OffsetAndEpoch(offset, 1) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +log.truncateToLatestSnapshot() -val numberOfRecords = 1 -val epoch = 1 -append(log, numberOfRecords, epoch) +append(log, numOfRecords, epoch = 1, initialOffset = 10) +append(log, numOfRecords, epoch = 2, initialOffset = 15) +append(log, numOfRecords, epoch = 4, initialOffset = 20) -val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 10) -assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.getType()) -assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) +// offset is not equal to oldest snapshot's offset +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(100, 3) +assertEquals(ValidOffsetAndEpoch.Type.DIVERGING, resultOffsetAndEpoch.getType) +assertEquals(new OffsetAndEpoch(20, 2), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateUnknownEpochLessThanLeaderGreaterThanOldestSnapshot(): Unit = { Review comment: How about `testValidateEpochLessThanFirstEpochInLog`? If you agree, let's change it in `MockLogTest` also. ## File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java ## @@ -25,7 +27,7 @@ this.offsetAndEpoch = offsetAndEpoch; } -public Type type() { +public Type getType() { Review comment: By the way we can also just change the name of the type and field to something like `public Kind kind()` ## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ## @@ -750,4 +782,13 @@ private void appendBatch(int numRecords, int epoch) { appendAsLeader(records, epoch); } + +private void appendBatch(int numRecords, int epoch, long initialOffset) { Review comment: How about `private void appendBatch(int numberOfRecords, int epoch)` and always use the LEO like `appendAsLeader(Collection records, int epoch)`? ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -413,22 +413,53 @@ final class KafkaMetadataLogTest { assertTrue(log.deleteBeforeSnapshot(snapshotId)) val resultOffsetAndEpoch = log.validateOffsetAndEpoch(offset, epoch) -assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType()) +assertEquals(ValidOffsetAndEpoch.Type.VALID, resultOffsetAndEpoch.getType) assertEquals(new OffsetAndEpoch(offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) } @Test - def testValidateEpochUnknown(): Unit = { + def testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot(): Unit = { +val offset = 10 +val numOfRecords = 5 + val log = buildMetadataLog(tempDir, mockTime) +log.updateHighWatermark(new LogOffsetMetadata(offset)) +val snapshotId = new OffsetAndEpoch(offset, 1) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.
[GitHub] [kafka] andrewegel commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
andrewegel commented on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-799593438 At the time of my comment here, jcenter is throwing 500s to my kafka build processes: https://status.bintray.com/incidents/ctv4bdfz08bg Now is a good as time as any to take this change to remove this project's dependence on the jcenter service. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r594552602 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition, s"different from the requested log dir $logDir") false case None => -createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints) +// not sure if topic ID should be none here, but not sure if we have access in ReplicaManager where this is called. Review comment: TODO: remove this when we decide if we want to pass in None or a topicID This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r594553183 ## File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java ## @@ -145,7 +145,7 @@ public void setup() { OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L); for (TopicPartition topicPartition : topicPartitions) { final Partition partition = this.replicaManager.createPartition(topicPartition); -partition.createLogIfNotExists(true, false, checkpoints); +partition.createLogIfNotExists(true, false, checkpoints, Option.empty()); Review comment: I think we can just not set the topic ID here, but want to confirm. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager
jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r594558352 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File, // Delete partition metadata file if the version does not support topic IDs. // Recover topic ID if present and topic IDs are supported +// If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist +// write to the partition metadata file. +// Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file. if (partitionMetadataFile.exists()) { if (!keepPartitionMetadataFile) partitionMetadataFile.delete() -else - topicId = partitionMetadataFile.read().topicId +else { + val fileTopicId = partitionMetadataFile.read().topicId + if (topicId.isDefined && fileTopicId != topicId.get) +throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId") Review comment: I don't know if it is possible to get to this error message. I think in most cases, the log should be grabbed if it already exists in the makeLeader/makeFollower path. In the log loading path, the topicId should be None. I thought it would be good to throw this error to know that something was wrong with the code, but maybe there is a better way. (Like maybe if topicId is defined in general) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes
C0urante commented on pull request #10014: URL: https://github.com/apache/kafka/pull/10014#issuecomment-799623900 @rhauch ping 🙂 This has been waiting for a while and the only concern that's been raised is unrelated to the fix at hand. Can you take another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #10320: MINOR: revert stream logging level back to ERROR
abbccdda merged pull request #10320: URL: https://github.com/apache/kafka/pull/10320 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12464) Enhance constrained sticky Assign algorithm
[ https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301824#comment-17301824 ] A. Sophie Blee-Goldman commented on KAFKA-12464: Thanks for the proposal – I think the basic idea makes sense. There is an existing scale test in AbstractStickyAssignorTest called testLargeAssignmentAndGroupWithUniformSubscription() which you can use to measure the improvement. It already has pretty good performance but further optimizations are welcome! If this has a significant impact we should also scale up the number of consumers and/or partitions in that test. The aim is to have it complete in a few seconds when running locally. Right now I believe we run it with 1mil partitions and 2,000 consumers; the 1mil partitions is probably already close (or beyond) the limits of what kafka can handle for a single consumer group in general, but I'd be interested in seeing how much we can push the size of the consumer group with this enhancement. Some quick notes about the second enhancement: (1) we need to track the revoked partitions as well (this is for the CooperativeStickyAssignor, needed for cooperative rebalancing), so we should also have something like allRevokedPartitions.removeAll(ownedPartitions.subList(maxQuota,ownedPartitions.size())); and (2), I think the second line should be unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota)); since we assigned the first sublist of the ownedPartitions list, not the second. (also (3), I'm guessing you just left this out for brevity but we still need to assign partitions in the case ownedPartitions.size() < maxQuota. Might be good to include this branch in the ticket description for clarity) Looking forward to the PR! > Enhance constrained sticky Assign algorithm > --- > > Key: KAFKA-12464 > URL: https://issues.apache.org/jira/browse/KAFKA-12464 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.7.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > In KAFKA-9987, we did a great improvement for the case when all consumers > were subscribed to same set of topics. The algorithm contains 4 phases: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > # Fill remaining members up to minQuota > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > Take an example for better understanding: > *example:* > Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, > ... t1p9 > Suppose, current assignment is: > _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ > _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ > Now, new consumer added: C2, so we'll do: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > After this phase, the assignment will be: (maxQuota will be 4) > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > # Fill remaining members up to minQuota > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9_ > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9,_ _t1p3_ > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > > As we can see, we need 3 phases to complete the assignment. But we can > actually completed with 2 phases. Here's the updated algorithm: > # Reassign as many previously owned partitions as possible, up to the > maxQuota, and also considering the numMaxQuota by the remainder of > (Partitions / Consumers) > # Fill remaining members up to maxQuota if current maxQuotaMember < > numMaxQuota, otherwise, to minQuota > > By considering the numMaxQuota, the original step 1 won't be too aggressive > to assign too many partitions to consumers, and the step 2 won't be too > conservative to assign not enough partitions to consumers, so that we don't > need step 3 and step 4 to balance them. > > {{So, the updated Pseudo-code sketch of the algorithm:}} > C_f := (P/N)_floor, the floor capacity > C_c := (P/N)_ceil, the ceiling capacity > *C_r := (P%N) the allowed number of members with C_c partitions assigned* > *num_max_capacity_members :=
[GitHub] [kafka] junrao commented on a change in pull request #10218: KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
junrao commented on a change in pull request #10218: URL: https://github.com/apache/kafka/pull/10218#discussion_r594509836 ## File path: clients/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java ## @@ -87,4 +89,27 @@ public byte id() { public static RemoteLogSegmentState forId(byte id) { return STATE_TYPES.get(id); } + +public static boolean isValidTransition(RemoteLogSegmentState srcState, RemoteLogSegmentState targetState) { +Objects.requireNonNull(targetState, "targetState can not be null"); + +if (srcState == null) { +// If the source state is null, check the target state as the initial state viz DELETE_PARTITION_MARKED Review comment: DELETE_PARTITION_MARKED is not part of RemoteLogSegmentState. ## File path: remote-storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataCache.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; + +/** + * This class provides an inmemory cache of remote log segment metadata. This maintains the lineage of segments + * with respect to epoch evolution. It also keeps track of segments which are not considered to be copied to remote + * storage. + */ +public class RemoteLogMetadataCache { +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCache.class); + +private final ConcurrentMap idToSegmentMetadata += new ConcurrentHashMap<>(); + +// It keeps the segments which are not yet reached to COPY_SEGMENT_FINISHED state. +private final Set remoteLogSegmentIdInProgress = new HashSet<>(); + +// It will have all the segments except with state as COPY_SEGMENT_STARTED. +private final ConcurrentMap> leaderEpochToOffsetToId += new ConcurrentHashMap<>(); + +private void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { +log.debug("Adding remote log segment metadata: [{}]", remoteLogSegmentMetadata); +idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata); +Map leaderEpochToOffset = remoteLogSegmentMetadata.segmentLeaderEpochs(); +for (Map.Entry entry : leaderEpochToOffset.entrySet()) { +leaderEpochToOffsetToId.computeIfAbsent(entry.getKey(), k -> new ConcurrentSkipListMap<>()) +.put(entry.getValue(), remoteLogSegmentMetadata.remoteLogSegmentId()); +} +} + +public Optional remoteLogSegmentMetadata(int leaderEpoch, long offset) { +NavigableMap offsetToId = leaderEpochToOffsetToId.get(leaderEpoch); +if (offsetToId == null || offsetToId.isEmpty()) { +return Optional.empty(); +} + +// look for floor entry as the given offset may exist in this entry. +Map.Entry entry = offsetToId.floorEntry(offset); +if (entry == null) { +// if the offset is lower than the minimum offset available in metadata then return empty. +return Optional.empty(); +} + +RemoteLogSegmentMetadata metadata = idToSegmentMetadata.get(entry.getValue()); +// check whether the given offset with leaderEpoch exists in this segment. +// check for epoch's offset boundaries with in this segment. +// 1. get the next epoch's start offset -1 if exists +// 2. if no next epoch exists, then segment end offset can be considered as epoch's relative end offset. +Map.Entry nextEntry = metadata.segmentLeaderEpochs() +.higherEntry(leaderEpoch); +long epochEndOf
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594599970 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: It is checking consistent `ElectionState`, which is basically the same as verifying all `quorum-state` files match. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #10321: Minor:timeout issue in Remove thread
wcarlson5 opened a new pull request #10321: URL: https://github.com/apache/kafka/pull/10321 timeout is a duration not a point in time ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10321: Minor:timeout issue in Remove thread
wcarlson5 commented on pull request #10321: URL: https://github.com/apache/kafka/pull/10321#issuecomment-799685448 @cadonna @ableegoldman Can I get a look at this? @vvcephei This will need to be picked to 2.8 as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12313) Consider deprecating the default.windowed.serde.inner.class configs
[ https://issues.apache.org/jira/browse/KAFKA-12313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301917#comment-17301917 ] A. Sophie Blee-Goldman commented on KAFKA-12313: Ah, I didn't notice (or forgot ;)) that we had started throwing an IllegalArgumentException instead of setting it to MAX_VALUE when windowSize = null during KIP-659. I guess we don't have to worry about that here In that case we should also do something similar for the innerClass: ie if (innerClassDeserializer != null && configInnerClassDeserializer != null) { throw new IllegalArgumentException("Inner class deserializer should not be set in both the time windowed deserializer constructor and the windowed.deserializer.inner.class config"); } But that still leaves the door open for users to _only_ set the config for both windowSize & innerClassSerde, which is what we want to discourage. That was what I was trying to propose in item #3 – that the configs should only be used for the console consumer, while for use within Kafka Streams the Serde should always be instantiated directly. Does that make sense? Here's what I'm thinking: We deprecate both existing configs, and replace them with a single windowed.deserializer.inner.class config. Since we technically only need this config for the console consumer, but not the producer, we can actually just make the config accept a deserializer class directly (rather than the serde class). That way it seems especially obvious that this config is not intended for use within Kafka Streams, since it's just a deserializer. WDYT? Also, Re: your question in #5: you're asking if we should ensure that both the innerClassDeserializer and the windowSize parameters are passed in to the configs, right? If so, then yes, that makes sense to me. > Consider deprecating the default.windowed.serde.inner.class configs > --- > > Key: KAFKA-12313 > URL: https://issues.apache.org/jira/browse/KAFKA-12313 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > Fix For: 3.0.0 > > > During the discussion of KIP-659 we discussed whether it made sense to have a > "default" class for the serdes of windowed inner classes across Streams. > Using these configs instead of specifying an actual Serde object can lead to > subtle bugs, since the WindowedDeserializer requires a windowSize in addition > to the inner class. If the default constructor is invoked, as it will be when > falling back on the config, this windowSize defaults to MAX_VALUE. > If the downstream program doesn't care about the window end time in the > output, then this can go unnoticed and technically there is no problem. But > if anything does depend on the end time, or the user just wants to manually > read the output for testing purposes, then the MAX_VALUE will result in a > garbage timestamp. > We should consider whether the convenience of specifying a config instead of > instantiating a Serde in each operator is really worth the risk of a user > accidentally failing to specify a windowSize -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
jsancio commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594638669 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: Yes. I meant to say `ElectionState` instead of `LeaderState`. `ElectionState` has a field called `votedIdOpt` for which `equals` checks for equality. This is not strictly required for having a "consistent" leader. I think for having a consistent leader for an epoch, only the `epoch` and `leaderIdOpt` need to match for all of the replicas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10310) Kafka Raft Snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-10310: --- Description: Tracking issue for KIP-630: Kafka Raft Snapshot https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot was:Tracking issue for KIP-630: Kafka Raft Snapshot > Kafka Raft Snapshot > --- > > Key: KAFKA-10310 > URL: https://issues.apache.org/jira/browse/KAFKA-10310 > Project: Kafka > Issue Type: New Feature >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for KIP-630: Kafka Raft Snapshot > https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10310) Kafka Raft Snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-10310: --- Description: Tracking issue for [KIP-630: Kafka Raft Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot] (was: Tracking issue for KIP-630: Kafka Raft Snapshot https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot) > Kafka Raft Snapshot > --- > > Key: KAFKA-10310 > URL: https://issues.apache.org/jira/browse/KAFKA-10310 > Project: Kafka > Issue Type: New Feature >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for [KIP-630: Kafka Raft > Snapshot|https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12466) Controller and Broker Metadata Snapshots
Jose Armando Garcia Sancio created KAFKA-12466: -- Summary: Controller and Broker Metadata Snapshots Key: KAFKA-12466 URL: https://issues.apache.org/jira/browse/KAFKA-12466 Project: Kafka Issue Type: New Feature Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12467) Disabled implementation for generating controller snapshots
Jose Armando Garcia Sancio created KAFKA-12467: -- Summary: Disabled implementation for generating controller snapshots Key: KAFKA-12467 URL: https://issues.apache.org/jira/browse/KAFKA-12467 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Controller implementation for generating snapshots but the default configuration doesn’t generate snapshots. The default configuration wont generate snapshot because both the Controller and Broker won't know how to load at this point. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12455) OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums
[ https://issues.apache.org/jira/browse/KAFKA-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301952#comment-17301952 ] Ron Dagostino commented on KAFKA-12455: --- I looked at the brokers' metadata caches for the two separate configurations -- ZK vs. Raft -- to find out what percentage of the time they showed showed 1 alive broker instead of 2. I was expecting the ZooKeeper configuration to show relatively little time with just 1 alive broker since the clients are never seeing that metadata situation, and I was expecting the Raft configuration to show a much higher percentage of time with just 1 alive broker since the clients do see that metadata situation. I did not find what I was expecting to find. The amounts of times where the brokers are advertising just 1 alive broker in their metadata cache as follows: *ZooKeeper Configuration*: BrokerId=1: 37 seconds out of 61 seconds of that broker's availability during the test, or 61% of the time with just 1 alive broker in metadata cache BrokerId=2: 39 seconds out of 61 seconds of that broker's availability during the test, or 64% of the time with just 1 alive broker in metadata cache *Raft Configuration*: BrokerId=1: 37 seconds out of 88 seconds of that broker's availability during the test, or 42% of the time with just 1 alive broker in metadata cache BrokerId=2: 52 seconds out of 88 seconds of that broker's availability during the test, or 59% of the time with just 1 alive broker in metadata cache So the brokers in the Zookeeper configuration consider just 1 broker to be alive more often than the brokers in the Raft configuration consider just 1 broker to be alive! It is still not clear why the consumers never see just a single alive broker in the ZooKeeper configuration. From the above it does not appear to be due to any difference in metadata cache population -- if it were just that then we would see the test failing in the ZooKeeper configuration since that actually advertises a single alive broker more frequently in terms of percentage of test time. > OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums > > > Key: KAFKA-12455 > URL: https://issues.apache.org/jira/browse/KAFKA-12455 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Blocker > > OffsetValidationTest.test_broker_rolling_bounce in `consumer_test.py` is > failing because the consumer group is rebalancing unexpectedly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12455) OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums
[ https://issues.apache.org/jira/browse/KAFKA-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301956#comment-17301956 ] Ron Dagostino commented on KAFKA-12455: --- The test is using the default value of metadata.max.age.ms=30 (5 minutes). When I explicitly turn it down to metadata.max.age.ms=5000 (5 seconds) the test passes for Raft but then fails for ZK (2 unexpected group rebalances in that case). I increased it to 10 seconds and then the Raft configuration failed with 3 unexpected rebalances and the ZK configuration failed with 1 unexpected rebalance. I decreased it to a very aggressive 1 second -- and they both passed. We have historically seen some flakiness in the ZooKeeper version of this test, and the fact that the test suddenly failed if we set metadata.max.age.ms to 5 or 10 seconds indicates that the it is just plain luck that the test is passing today. Given that the current client-side code doesn't fall back to the bootstrap brokers when it sees no brokers available, I think any test really needs to make it *impossible* for the client to see cluster metadata with just a single broker. Decreasing the metadata max age decreases the possibility of it happening but doesn't make it impossible. Another experiment was to keep metadata.max.age.ms=30 but define session.timeout.ms = 3 instead of the 1 it was setting before -- this is longer tyan the broker roll time, and in fact this change allows both configurations to pass. A further experiment was to keep metadata.max.age.ms=30 and session.timeout.ms = 1 but expand to 3 brokers instead of just 2. This should fix the issue since there would never be a situation where just 1 broker is available, and a METADATA response would always have at least 2 brokers for the consumer to use. Both configurations pass. > OffsetValidationTest.test_broker_rolling_bounce failing for Raft quorums > > > Key: KAFKA-12455 > URL: https://issues.apache.org/jira/browse/KAFKA-12455 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Blocker > > OffsetValidationTest.test_broker_rolling_bounce in `consumer_test.py` is > failing because the consumer group is rebalancing unexpectedly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner
bbejeck commented on a change in pull request #10150: URL: https://github.com/apache/kafka/pull/10150#discussion_r594665281 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.junit.Assert.assertEquals; + +public class KStreamImplValueJoinerWithKeyTest { + +private KStream leftStream; +private KStream rightStream; +private KTable ktable; +private GlobalKTable globalKTable; +private StreamsBuilder builder; + +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private final String leftTopic = "left"; +private final String rightTopic = "right"; +private final String ktableTopic = "ktableTopic"; +private final String globalTopic = "globalTopic"; +private final String outputTopic = "joined-result"; + +private final ValueJoinerWithKey valueJoinerWithKey = +(key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv)); +private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)); +private final StreamJoined streamJoined = +StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); +private final Joined joined = +Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); +private final KeyValueMapper keyValueMapper = +(k, v) -> k; + +@Before +public void setup() { +builder = new StreamsBuilder(); +leftStream = builder.stream(leftTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +rightStream = builder.stream(rightTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +ktable = builder.table(ktableTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +globalKTable = builder.globalTable(globalTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +} + +@Test +public void shouldIncludeKeyInStreamSteamJoinResults() { +leftStream.join( +rightStream, +valueJoinerWithKey, +joinWindows, +streamJoined +).to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); +// Left KV A, 3, Right KV A, 5 +runJoinTopology(builder, +Collections.singletonList(KeyValue.pair("A", "A:5")), +false, +rightTopic +); +} + +@Test +public void shouldIncludeKeyInStreamLeftJoinResults() { +leftStream.leftJoin( +rightStream, +valueJoinerWithKey, +joinWindows, +streamJoined +).to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); +// Left KV A, 3, Right KV A, 5 +// TTD pipes reco
[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner
bbejeck commented on a change in pull request #10150: URL: https://github.com/apache/kafka/pull/10150#discussion_r594665349 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplValueJoinerWithKeyTest.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.test.StreamsTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.time.Duration.ofMillis; +import static org.junit.Assert.assertEquals; + +public class KStreamImplValueJoinerWithKeyTest { + +private KStream leftStream; +private KStream rightStream; +private KTable ktable; +private GlobalKTable globalKTable; +private StreamsBuilder builder; + +private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); +private final String leftTopic = "left"; +private final String rightTopic = "right"; +private final String ktableTopic = "ktableTopic"; +private final String globalTopic = "globalTopic"; +private final String outputTopic = "joined-result"; + +private final ValueJoinerWithKey valueJoinerWithKey = +(key, lv, rv) -> key + ":" + (lv + (rv == null ? 0 : rv)); +private final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)); +private final StreamJoined streamJoined = +StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); +private final Joined joined = +Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()); +private final KeyValueMapper keyValueMapper = +(k, v) -> k; + +@Before +public void setup() { +builder = new StreamsBuilder(); +leftStream = builder.stream(leftTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +rightStream = builder.stream(rightTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +ktable = builder.table(ktableTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +globalKTable = builder.globalTable(globalTopic, Consumed.with(Serdes.String(), Serdes.Integer())); +} + +@Test +public void shouldIncludeKeyInStreamSteamJoinResults() { +leftStream.join( +rightStream, +valueJoinerWithKey, +joinWindows, +streamJoined +).to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); +// Left KV A, 3, Right KV A, 5 +runJoinTopology(builder, +Collections.singletonList(KeyValue.pair("A", "A:5")), +false, +rightTopic +); +} + +@Test +public void shouldIncludeKeyInStreamLeftJoinResults() { +leftStream.leftJoin( +rightStream, +valueJoinerWithKey, +joinWindows, +streamJoined +).to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); +// Left KV A, 3, Right KV A, 5 +// TTD pipes reco
[jira] [Created] (KAFKA-12468) Initial offsets are copied from source to target cluster
Bart De Neuter created KAFKA-12468: -- Summary: Initial offsets are copied from source to target cluster Key: KAFKA-12468 URL: https://issues.apache.org/jira/browse/KAFKA-12468 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.7.0 Reporter: Bart De Neuter We have an active-passive setup where the 3 connectors from mirror maker 2 (heartbeat, checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster. Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets from the source cluster are initially copied to the target cluster without translation. This causes a negative lag for all synced consumer groups. Only when we reset the offsets for each topic/partition on the target cluster and produce a record on the topic/partition in the source, the sync starts working correctly. I would expect that the consumer groups are synced but that the current offsets of the source cluster are not copied to the target cluster. This is the configuration we are currently using: Heartbeat connector {code:xml} { "name": "mm2-mirror-heartbeat", "config": { "name": "mm2-mirror-heartbeat", "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "", "target.cluster.bootstrap.servers": "", "topics": ".*", "groups": ".*", "tasks.max": "1", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} Checkpoint connector: {code:xml} { "name": "mm2-mirror-checkpoint", "config": { "name": "mm2-mirror-checkpoint", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "", "target.cluster.bootstrap.servers": "", "topics": ".*", "groups": ".*", "tasks.max": "40", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} Source connector: {code:xml} { "name": "mm2-mirror-source", "config": { "name": "mm2-mirror-source", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "", "target.cluster.bootstrap.servers": "", "topics": ".*", "groups": ".*", "tasks.max": "40", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
kowshik commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-799747349 @satishd should this PR be reviewed after #10218 has been reviewed and merged? It seems like this PR contains some of the code thats seen in #10218, ex: `InmemoryRemoteLogMetadataManager.java`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #10150: KAFKA-3745: Add access to read-only key in value joiner
bbejeck commented on a change in pull request #10150: URL: https://github.com/apache/kafka/pull/10150#discussion_r594678897 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -1315,15 +1315,15 @@ void to(final TopicNameExtractor topicExtractor, final JoinWindows windows); /** - * Join records of this stream with another {@code KStream}'s records using windowed inner equi join using the Review comment: >I guess you actually inserted this a copy from above with slight modification only. Yes, I wanted to keep everything in the same order. I'm not sure how to handle how the diffs get rendered in this situation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.
Rafał Chmielewski created KAFKA-12469: - Summary: The topic names in the metrics do not retain their format when extracting through JMX. Key: KAFKA-12469 URL: https://issues.apache.org/jira/browse/KAFKA-12469 Project: Kafka Issue Type: Bug Components: metrics Reporter: Rafał Chmielewski I have topic names that have a period in the name: product.order product.offering.price However, for the metrics issued by JMX by a program that is a consumer of Kafka messages, the dots are replaced with an underscore: kafka.consumer<>records-lead But for the producer, this problem doesn't occur: kafka.producer<>record-send-total As a consumer I use Akka Alpakka. But I think it's using Apache library to connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12470) The topic names in the metrics do not retain their format when extracting through JMX.
Rafał Chmielewski created KAFKA-12470: - Summary: The topic names in the metrics do not retain their format when extracting through JMX. Key: KAFKA-12470 URL: https://issues.apache.org/jira/browse/KAFKA-12470 Project: Kafka Issue Type: Bug Components: metrics Reporter: Rafał Chmielewski I have topic names that have a period in the name: product.order product.offering.price However, for the metrics issued by JMX by a program that is a consumer of Kafka messages, the dots are replaced with an underscore: kafka.consumer<>records-lead But for the producer, this problem doesn't occur: kafka.producer<>record-send-total As a consumer I use Akka Alpakka. But I think it's using Apache library to connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10666) Kafka doesn't use keystore / key / truststore passwords for named SSL connections
[ https://issues.apache.org/jira/browse/KAFKA-10666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302015#comment-17302015 ] Jason commented on KAFKA-10666: --- This might have been user error. In revisiting my code, it looks like my settings may have been missing the .ssl. Sorry :D > Kafka doesn't use keystore / key / truststore passwords for named SSL > connections > - > > Key: KAFKA-10666 > URL: https://issues.apache.org/jira/browse/KAFKA-10666 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.5.0, 2.6.0 > Environment: kafka in an openjdk-11 docker container, the client java > application is in an alpine container. zookeeper in a separate container. >Reporter: Jason >Priority: Minor > > When configuring named listener SSL connections with ssl key and keystore > with passwords including listener.name.ourname.ssl.key.password, > listener.name.ourname.ssl.keystore.password, and > listener.name.ourname.ssl.truststore.password via via the AdminClient the > settings are not used and the setting is not accepted if the default > ssl.key.password or ssl.keystore.password are not set. We configure all > keystore and truststore values for the named listener in a single batch using > incrementalAlterConfigs. Additionally, when ssl.keystore.password is set to > the value of our keystore password the keystore is loaded for SSL > communication without issue, however if ssl.keystore.password is incorrect > and listener.name.ourname.keystore.password is correct, we are unable to load > the keystore with bad password errors. It appears that only the default > ssl.xxx.password settings are used. This setting is immutable as when we > attempt to set it we get an error indicating that the listener.name. setting > can be set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12470) The topic names in the metrics do not retain their format when extracting through JMX.
[ https://issues.apache.org/jira/browse/KAFKA-12470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rafał Chmielewski resolved KAFKA-12470. --- Resolution: Duplicate > The topic names in the metrics do not retain their format when extracting > through JMX. > -- > > Key: KAFKA-12470 > URL: https://issues.apache.org/jira/browse/KAFKA-12470 > Project: Kafka > Issue Type: Bug > Components: metrics >Reporter: Rafał Chmielewski >Priority: Major > > I have topic names that have a period in the name: > product.order > product.offering.price > > However, for the metrics issued by JMX by a program that is a consumer of > Kafka messages, the dots are replaced with an underscore: > kafka.consumer client-id=consumer-export-4, topic=product_offering_price, > partition=1><>records-lead > > But for the producer, this problem doesn't occur: > kafka.producer client-id=bss.data.verification.pi_1, > topic=product.offering.price><>record-send-total > > As a consumer I use Akka Alpakka. But I think it's using Apache library to > connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799770551 Hi @ikdekker. I'm not a committer, but I can answer some of your questions. The report you mentioned was generated to the builds directory. If you copy and paste the file path into a web browser, you should be able to see the result. As for the JIRA ticket, the `KAFKA-12456` at the start of the PR title will automatically link this PR to the ticket. If you'd like access to editing the ticket, you can create an Apache JIRA account. Then simply ask for JIRA permissions by sending your JIRA username to d...@kafka.apache.org Hope this helps! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799770551 Hi @ikdekker. I'm not a committer, but I can answer some of your questions. The report you mentioned was generated to the builds directory. If you copy and paste the file path into a web browser, you should be able to see the result. As for the JIRA ticket, the `KAFKA-12456` at the start of the PR title will automatically link this PR to the ticket. If you'd like access to editing the ticket, you can create an Apache JIRA account. Then simply ask for JIRA permissions by sending your JIRA username to d...@kafka.apache.org More info on mailing lists here: http://kafka.apache.org/contact Hope this helps! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft
rondagostino opened a new pull request #10322: URL: https://github.com/apache/kafka/pull/10322 OffsetValidationTest.test_broker_rolling_bounce was failing when used with a Raft-based metadata quorum but succeeding with a ZooKeeper-based quorum. The problem was that in the Raft case the consumer was sometimes receiving a METADATA response with just 1 alive broker, and then when that broker rolled the consumer wouldn't know about any alive nodes. It would have to wait until the broker returned before it could reconnect, and by that time the group coordinator on the second broker would have timed-out the client and initiated a group rebalance. The test explicitly checks that no rebalances occur, so the test would fail. It turns out that the reason why the ZooKeeper configuration wasn't seeing rebalances was just plain luck. The brokers' metadata caches in the ZooKeeper configuration show 1 alive broker even more frequently than the Raft configuration does. If we tweak the metadata.max.age.ms value on the consumers we can easily get the ZooKeeper test to fail, and in fact this system test has historically been flaky for the ZoKeeper configuration. We can get the test to pass by setting session.timeout.ms=3 (which is longer than the roll time of any broker), or we can increase the broker count so that the client never sees a METADATA response with just a single alive broker and therefore never loses contact with the cluster for an extended period of time. This patch increases the broker count for this particular system test from 2 to 3. This patch also fixes a minor logging bug that was discovered in `RaftReplicaManager` that was discovered during the debugging of this issue, and it adds an extra logging statement when a single metadata batch is applied to mirror the same logging statement that occurs when deferred metadata changes are applied. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft
rondagostino commented on pull request #10322: URL: https://github.com/apache/kafka/pull/10322#issuecomment-799777289 This patch needs to be cherry-picked to 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10322: KAFKA-12455: OffsetValidationTest.test_broker_rolling_bounce fail: Raft
rondagostino commented on pull request #10322: URL: https://github.com/apache/kafka/pull/10322#issuecomment-799781073 As per an offline conversation, since 2 brokers is a supported cluster size, we would prefer that this system test keep 2 brokers instead of bumping it to 3 -- we have lots of tests that run with 3 brokers already. So I will change the test to use the `session.timeout.ms=3` solution instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.
[ https://issues.apache.org/jira/browse/KAFKA-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rafał Chmielewski updated KAFKA-12469: -- Description: I have topic names that have a period in the name: product.order product.offering.price However, for the metrics issued by JMX by a program that is a consumer of Kafka messages, the dots are replaced with an underscore: kafka.consumer<>records-lead This creates a problem if I want to calculate the customer's lag in relation to the number of messages on Kafka. But for the producer, this problem doesn't occur: kafka.producer<>record-send-total As a consumer I use Akka Alpakka. But I think it's using Apache library to connect to Kafka and report metrics via JMX. was: I have topic names that have a period in the name: product.order product.offering.price However, for the metrics issued by JMX by a program that is a consumer of Kafka messages, the dots are replaced with an underscore: kafka.consumer<>records-lead But for the producer, this problem doesn't occur: kafka.producer<>record-send-total As a consumer I use Akka Alpakka. But I think it's using Apache library to connect to Kafka and report metrics via JMX. > The topic names in the metrics do not retain their format when extracting > through JMX. > -- > > Key: KAFKA-12469 > URL: https://issues.apache.org/jira/browse/KAFKA-12469 > Project: Kafka > Issue Type: Bug > Components: metrics >Reporter: Rafał Chmielewski >Priority: Major > > I have topic names that have a period in the name: > product.order > product.offering.price > > However, for the metrics issued by JMX by a program that is a consumer of > Kafka messages, the dots are replaced with an underscore: > kafka.consumer client-id=consumer-export-4, topic=product_offering_price, > partition=1><>records-lead > > This creates a problem if I want to calculate the customer's lag in relation > to the number of messages on Kafka. > > But for the producer, this problem doesn't occur: > kafka.producer client-id=bss.data.verification.pi_1, > topic=product.offering.price><>record-send-total > > As a consumer I use Akka Alpakka. But I think it's using Apache library to > connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12469) The topic names in the metrics do not retain their format when extracting through JMX.
[ https://issues.apache.org/jira/browse/KAFKA-12469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rafał Chmielewski updated KAFKA-12469: -- Component/s: consumer > The topic names in the metrics do not retain their format when extracting > through JMX. > -- > > Key: KAFKA-12469 > URL: https://issues.apache.org/jira/browse/KAFKA-12469 > Project: Kafka > Issue Type: Bug > Components: consumer, metrics >Reporter: Rafał Chmielewski >Priority: Major > > I have topic names that have a period in the name: > product.order > product.offering.price > > However, for the metrics issued by JMX by a program that is a consumer of > Kafka messages, the dots are replaced with an underscore: > kafka.consumer client-id=consumer-export-4, topic=product_offering_price, > partition=1><>records-lead > > This creates a problem if I want to calculate the customer's lag in relation > to the number of messages on Kafka. > > But for the producer, this problem doesn't occur: > kafka.producer client-id=bss.data.verification.pi_1, > topic=product.offering.price><>record-send-total > > As a consumer I use Akka Alpakka. But I think it's using Apache library to > connect to Kafka and report metrics via JMX. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
ijuma commented on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-799785567 @dejan2609 can you please resolve the conflict? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12471) Implement createPartitions in KIP-500 mode
Colin McCabe created KAFKA-12471: Summary: Implement createPartitions in KIP-500 mode Key: KAFKA-12471 URL: https://issues.apache.org/jira/browse/KAFKA-12471 Project: Kafka Issue Type: New Feature Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12471) Implement createPartitions in KIP-500 mode
[ https://issues.apache.org/jira/browse/KAFKA-12471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-12471: Assignee: Colin McCabe Labels: kip-500 (was: ) > Implement createPartitions in KIP-500 mode > -- > > Key: KAFKA-12471 > URL: https://issues.apache.org/jira/browse/KAFKA-12471 > Project: Kafka > Issue Type: New Feature >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Labels: kip-500 > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
hachikuji commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594725779 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: It amounts to the same thing because `votedIdOpt` is only set when the election outcome has not been determined. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799798100 Hi @jolshan, I have looked into my build directories. The ones that my gradle outputs after running tests. These do not contain the reports. For example, both clients and streams modules do not contain an html file: ![image](https://user-images.githubusercontent.com/9272424/111229557-63e73080-85e6-11eb-8d2b-0be54a6ffbe8.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799798100 Hi @jolshan, Thanks for the response! I have looked into my build directories. The ones that my gradle outputs after running tests. These do not contain the reports. For example, both clients and streams modules do not contain an html file: ![image](https://user-images.githubusercontent.com/9272424/111229557-63e73080-85e6-11eb-8d2b-0be54a6ffbe8.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799800389 @ikdekker Hmm. When you run another test, old files are deleted. Try copy and pasting the file path directly after running a test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced
[ https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302075#comment-17302075 ] Colin McCabe commented on KAFKA-10518: -- One simple workaround is to increase max.poll.records so that you get more records per fetch for the partition that has a high rate of new records. I do wonder whether there's any useful purpose served by "explicitly exclud[ing] partitions for which the consumer received data in the previous round". It seems a bit like an implementation hack based on how we do buffering in the consumer, but I could be missing something > Consumer fetches could be inefficient when lags are unbalanced > -- > > Key: KAFKA-10518 > URL: https://issues.apache.org/jira/browse/KAFKA-10518 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Priority: Major > > Consumer fetches are inefficient when lags are imbalanced across partitions, > due to head of the line blocking and the behavior of blocking for > `max.wait.ms` until data is available. > When the consumer receives a fetch response, it prepares the next fetch > request and sends it out. The caveat is that the subsequent fetch request > would explicitly exclude partitions for which the consumer received data in > the previous round. This is to allow the consumer application to drain the > data for those partitions, until the consumer fetches the other partitions it > is subscribed to. > This behavior does not play out too well if the consumer is consuming when > the lag is unbalanced, because it would receive data for the partitions it is > lagging on, and then it would send a fetch request for partitions that do not > have any data (or have little data). The latter will end up blocking for > fetch.max.wait.ms on the broker before an empty response is sent back. This > slows down the consumer’s overall consumption throughput since > fetch.max.wait.ms is 500ms by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #10320: MINOR: revert stream logging level back to ERROR
abbccdda commented on pull request #10320: URL: https://github.com/apache/kafka/pull/10320#issuecomment-799811099 Cherry-picked to 2.8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12453) Guidance on whether a topology is eligible for optimisation
[ https://issues.apache.org/jira/browse/KAFKA-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302087#comment-17302087 ] A. Sophie Blee-Goldman commented on KAFKA-12453: [~mjsax] I thought we don't enable the changelog optimization for repartition topics, only "true" source topics? Did we make an exception for the new toTable() operator, or am I just misinterpreting the context of this ticket? > Guidance on whether a topology is eligible for optimisation > --- > > Key: KAFKA-12453 > URL: https://issues.apache.org/jira/browse/KAFKA-12453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Patrick O'Keeffe >Priority: Major > > Since the introduction of KStream.toTable() in Kafka 2.6.x, the decision > about whether a topology is eligible for optimisation is no longer a simple > one, and is related to whether toTable() operations are preceded by key > changing operators. > This decision requires expert level knowledge, and there are serious > implications associated with getting it wrong in terms of fault tolerance > Some ideas spring to mind around how to guide developers to make the correct > decision: > # Topology.describe() could indicate whether this topology is eligible for > optimisation > # Topologies could be automatically optimised - note this may have an impact > at deployment time, in that an application reset may be required. The > developer would need to made aware of this and adjust the deployment plan > accordingly > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799813229 Directly after running (all tests), I see only a clients index.html, streams and core do not generate a report, even when running `./gradlew streams:test` or `./gradlew core:test` (which would be relevant for this PR). The streams tests seem to take a very long time and many fail due to connection timeouts. Is it necessary to start Zookeeper for these tests? The command at https://kafka.apache.org/quickstart throws an error, which may require some Zookeeper setup: `Error: Could not find or load main class org.apache.zookeeper.server.quorum.QuorumPeerMain ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #10323: KAFKA-12459; Use property testing library for raft event simulation tests
hachikuji opened a new pull request #10323: URL: https://github.com/apache/kafka/pull/10323 This patch changes the raft simulation tests to use jqwik, which is a property testing library. This provides two main benefits: - It simplifies the randomization of test parameters. Currently the tests use a fixed set of `Random` seeds, which means that most builds are doing redundant work. We get a bigger benefit from allowing each build to test different parameterizations. - It makes it easier to reproduce failures. Whenever a test fails, jqwik will report the random seed that failed. A developer can then modify the `@Property` annotation to use that specific seed in order to reproduce the failure. Note that I have resisted making logical changes to the tests themselves. The only difference is the way the parameters are specified. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018 @ikdekker You don't need to follow any of the quickstart steps to run the tests. Simply running `./gradlew test` or any subset of tests should do the trick. As for `core:test` not generating a report, that may be due to the build passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go. With streams, I noticed a gradle error in the screenshot above. If the run is incomplete, the report file may not be generated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018 @ikdekker You don't need to follow any of the quickstart steps to run the tests. Simply running `./gradlew test` or any subset of tests should do the trick. As for `core:test` not outputting a report, that may be due to the build passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go. With streams, I noticed a gradle error in the screenshot above. If the run is incomplete, the report file may not be generated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799824018 @ikdekker You don't need to follow any of the quickstart steps to run the tests. Simply running `./gradlew test` or any subset of tests should do the trick. As for `core:test` not outputting a report, that may be due to the build passing (no failed tests). If it says "BUILD SUCESSFUL" you are good to go. EDIT: oh hmmm, you are right. It should still output the result to the file. I'm not sure about why that is not happening, but it should be. It shows up for me. With streams, I noticed a gradle error in the screenshot above. If the run is incomplete, the report file may not be generated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799830566 Running core:tests takes around 20 minutes and fails at the end. If it is as you say and simply running the gradle, is it expected to have some failures? For example for tests that need authentication. core:tests output is: ``` 2479 tests completed, 257 failed, 13 skipped > Task :core:test FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:test'. > Process 'Gradle Test Executor 4' finished with non-zero exit value 1 This problem might be caused by incorrect test process configuration. Please refer to the test execution section in the User Manual at https://docs.gradle.org/6.8.3/userguide/java_testing.html#sec:test_execution BUILD FAILED in 22m 13s 28 actionable tasks: 3 executed, 25 up-to-date ``` An example of failure with auth: ``` kafka.api.DescribeAuthorizedOperationsTest > testConsumerGroupAuthorizedOperations FAILED org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ikdekker edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799830566 Running core:tests takes around 20 minutes and fails at the end. If it is as you say and simply running the gradle, is it expected to have some failures? For example for tests that need authentication. core:tests output is: ``` 2479 tests completed, 257 failed, 13 skipped > Task :core:test FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:test'. > Process 'Gradle Test Executor 4' finished with non-zero exit value 1 This problem might be caused by incorrect test process configuration. Please refer to the test execution section in the User Manual at https://docs.gradle.org/6.8.3/userguide/java_testing.html#sec:test_execution BUILD FAILED in 22m 13s 28 actionable tasks: 3 executed, 25 up-to-date ``` An example of failure with auth: ``` kafka.api.DescribeAuthorizedOperationsTest > testConsumerGroupAuthorizedOperations FAILED org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user ``` By the way, should the trunk branch also have this report generation, or is it unstable? And from which branch do you recommend to start the PR, 2.8? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799842000 @ikdekker Hmm. The best advice I can give here is running `./gradlew clean` and trying again. I've seen plenty of green runs with core tests, but yes, it usually takes 20 min or so. Please develop in trunk. This should be the most up-to-date branch and reports should generate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
jolshan edited a comment on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799842000 @ikdekker Hmm. The best advice I can give here is running `./gradlew clean` and trying again. I've seen plenty of green runs with core tests, but yes, it usually takes 20 min or so. For running shorter tests, try `core:unitTest` Please develop in trunk. This should be the most up-to-date branch and reports should generate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
dejan2609 commented on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-799842860 @ijuma Done (conflict solved / branch force-pushed). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio opened a new pull request #10324: URL: https://github.com/apache/kafka/pull/10324 Improve the benchmark tests for TimelineHashMap by adding tests for adding entries, removing entries and Scala's immutable hash map. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799843740 Alright, I will check some things out. This PR was created from trunk. I might try to create some tests myself, which I should be able to get output from as a starting point. Thanks again for your time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10324: MINOR: Add a few more benchmark for the timeline map
jsancio commented on pull request #10324: URL: https://github.com/apache/kafka/pull/10324#issuecomment-799843745 Benchmark results: ``` # Run complete. Total time: 00:26:54 Benchmark Mode CntScore Error Units TimelineHashMapBenchmark.testAddEntriesInHashMap avgt 10 238.332 ± 4.554 ms/op TimelineHashMapBenchmark.testAddEntriesInImmutableMap avgt 10 366.732 ± 6.463 ms/op TimelineHashMapBenchmark.testAddEntriesInTimelineMap avgt 10 277.197 ± 4.699 ms/op TimelineHashMapBenchmark.testAddEntriesWithSnapshots avgt 10 302.747 ± 4.959 ms/op TimelineHashMapBenchmark.testRemoveEntriesInHashMap avgt 10 201.004 ± 2.675 ms/op TimelineHashMapBenchmark.testRemoveEntriesInImmutableMap avgt 10 479.964 ± 7.254 ms/op TimelineHashMapBenchmark.testRemoveEntriesInTimelineMap avgt 10 195.382 ± 1.917 ms/op TimelineHashMapBenchmark.testRemoveEntriesWithSnapshots avgt 10 427.747 ± 12.865 ms/op TimelineHashMapBenchmark.testUpdateEntriesInHashMap avgt 10 267.895 ± 20.143 ms/op TimelineHashMapBenchmark.testUpdateEntriesInImmutableMap avgt 10 532.843 ± 5.766 ms/op TimelineHashMapBenchmark.testUpdateEntriesInTimelineMap avgt 10 364.766 ± 25.154 ms/op TimelineHashMapBenchmark.testUpdateEntriesWithSnapshots avgt 10 488.308 ± 43.992 ms/op JMH benchmarks done ``` Benchmark configuration: ``` # JMH version: 1.27 # VM version: JDK 11.0.10, OpenJDK 64-Bit Server VM, 11.0.10+9-Ubuntu-0ubuntu1.20.10 # VM invoker: /usr/lib/jvm/java-11-openjdk-amd64/bin/java # VM options: # JMH blackhole mode: full blackhole + dont-inline hint # Warmup: 3 iterations, 10 s each # Measurement: 10 iterations, 10 s each # Timeout: 10 min per iteration # Threads: 1 thread, will synchronize iterations # Benchmark mode: Average time, time/op ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12428) Add a last-heartbeat-seconds-ago metric to Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-12428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12428: -- Labels: newbie++ (was: newbie) > Add a last-heartbeat-seconds-ago metric to Kafka Consumer > - > > Key: KAFKA-12428 > URL: https://issues.apache.org/jira/browse/KAFKA-12428 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > I have encountered several issues in the past where heartbeat requests are > not sent [1,2] (either in time, or ever), and today it is a bit hard to get > to that from the logs. I think it is better to add a metric as > "last-heartbeat-seconds-ago" where when rebalances were triggered we can > immediately find out if this is the root cause. > 1. https://issues.apache.org/jira/browse/KAFKA-10793 > 2. https://issues.apache.org/jira/browse/KAFKA-10827 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ikdekker commented on pull request #10313: KAFKA-12456: Log "Listeners are not identical across brokers" message at WARN/INFO instead of ERROR
ikdekker commented on pull request #10313: URL: https://github.com/apache/kafka/pull/10313#issuecomment-799845091 By the way, I sent my JIRA username to the dev mail, not sure if i should have typed some extra lines there. But then I might try again 😄 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10309: KAFKA-12181; Loosen raft fetch offset validation of remote replicas
jsancio commented on a change in pull request #10309: URL: https://github.com/apache/kafka/pull/10309#discussion_r594787145 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java ## @@ -401,6 +401,51 @@ private void checkBackToBackLeaderFailures(QuorumConfig config) { } } +@Test +public void checkSingleNodeCommittedDataLossQuorumSizeThree() { +checkSingleNodeCommittedDataLoss(new QuorumConfig(3, 0)); +} + +private void checkSingleNodeCommittedDataLoss(QuorumConfig config) { +assertTrue(config.numVoters > 2, +"This test requires the cluster to be able to recover from one failed node"); + +for (int seed = 0; seed < 100; seed++) { +// We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` +// invariants since the loss of committed data on one node can violate them. + +Cluster cluster = new Cluster(config, seed); +EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); +scheduler.addInvariant(new MonotonicHighWatermark(cluster)); +scheduler.addInvariant(new SingleLeader(cluster)); +scheduler.addValidation(new ConsistentCommittedData(cluster)); + +MessageRouter router = new MessageRouter(cluster); + +cluster.startAll(); +schedulePolling(scheduler, cluster, 3, 5); +scheduler.schedule(router::deliverAll, 0, 2, 5); +scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); +scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + +RaftNode node = cluster.randomRunning().orElseThrow(() -> +new AssertionError("Failed to find running node") +); + +// Kill a random node and drop all of its persistent state. The Raft +// protocol guarantees should still ensure we lose no committed data +// as long as a new leader is elected before the failed node is restarted. +cluster.kill(node.nodeId); +cluster.deletePersistentState(node.nodeId); +scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); Review comment: Got it. I missed that `votedIdOpt` is set to `empty` by the leader and the followers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
Guozhang Wang created KAFKA-12472: - Summary: Add a Consumer / Streams metric to indicate the current rebalance status Key: KAFKA-12472 URL: https://issues.apache.org/jira/browse/KAFKA-12472 Project: Kafka Issue Type: Improvement Components: consumer, streams Reporter: Guozhang Wang Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. CoordinatorRequested => any of the coordinator response contains a RebalanceInProgress error code. 2. NewMember => when the join group response has a MemberIdRequired error code. 3. UnknownMember => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. StaleMember => when any of the coordinator response contains an IllegalGeneration error code. 5. DroppedGroup => when hb thread decides to call leaveGroup due to hb expired. 6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. MetadataChanged => requestRejoin triggered since metadata has changed. 8. SubscriptionChanged => requestRejoin triggered since subscription has changed. 9. RetryOnError => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. RevocationNeeded => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then its overrides all embedded consumer layer status code. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. RevocationNeeded => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. AssignmentProbing => leader decides to schedule follow-ups since the current assignment is unstable. 12. VersionProbing => leader decides to schedule follow-ups due to version probing. 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. CoordinatorRequested => any of the coordinator response contains a RebalanceInProgress error code. 2. NewMember => when the join group response has a MemberIdRequired error code. 3. UnknownMember => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. StaleMember => when any of the coordinator response contains an IllegalGeneration error code. 5. DroppedGroup => when hb thread decides to call leaveGroup due to hb expired. 6. UserRequested => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. MetadataChanged => requestRejoin triggered since metadata has changed. 8. SubscriptionChanged => requestRejoin triggered since subscription has changed. 9. RetryOnError => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. RevocationNeeded => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then its overrides all embedded consumer layer status code. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. RevocationNeeded => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. AssignmentProbing => leader decides to schedule follow-ups since the current assignment is unstable. 12. VersionProbing => leader decides to schedule follow-ups due to version probing. 13. EndpointUpdate => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. CoordinatorRequested => any of the coordinator response contains a RebalanceInProgress error code. 2. NewMember => when the join group response has a MemberIdRequired error code. 3. UnknownMember => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. StaleMember => when any of the coo
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then its overrides all embedded consumer layer status code. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. CoordinatorRequested => any of the coordinator response contains a RebalanceInProgress error code. 2. NewMember => when the join group response has a MemberIdRequired error code. 3. UnknownMember => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. StaleM
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then its overrides all embedded consumer layer status code. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. None => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4
[jira] [Updated] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced
[ https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Lee updated KAFKA-10518: Attachment: kafka-slow-consumer-repro.tar.gz > Consumer fetches could be inefficient when lags are unbalanced > -- > > Key: KAFKA-10518 > URL: https://issues.apache.org/jira/browse/KAFKA-10518 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Priority: Major > Attachments: kafka-slow-consumer-repro.tar.gz > > > Consumer fetches are inefficient when lags are imbalanced across partitions, > due to head of the line blocking and the behavior of blocking for > `max.wait.ms` until data is available. > When the consumer receives a fetch response, it prepares the next fetch > request and sends it out. The caveat is that the subsequent fetch request > would explicitly exclude partitions for which the consumer received data in > the previous round. This is to allow the consumer application to drain the > data for those partitions, until the consumer fetches the other partitions it > is subscribed to. > This behavior does not play out too well if the consumer is consuming when > the lag is unbalanced, because it would receive data for the partitions it is > lagging on, and then it would send a fetch request for partitions that do not > have any data (or have little data). The latter will end up blocking for > fetch.max.wait.ms on the broker before an empty response is sent back. This > slows down the consumer’s overall consumption throughput since > fetch.max.wait.ms is 500ms by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then it ignores all embedded consumer layer status code since it should has a higher precedence. In addition, when create aggregated metric across streams instance within an app, we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member i
[jira] [Updated] (KAFKA-8127) It may need to import scala.io
[ https://issues.apache.org/jira/browse/KAFKA-8127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JieFang.He updated KAFKA-8127: -- Description: I get an error when compile kafka,which disappear when import scala.io {code:java} core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: object Source is not a member of package io val lineIterators = files.map(io.Source.fromFile(_).getLines) ^ 6 warnings found one error found :core:compileScala FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:compileScala'. > Compilation failed {code} was: I get an error when compile kafka,which disappear when import scala.io {code:java} D:\gerrit\Kafka\core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: object Source is not a member of package io val lineIterators = files.map(io.Source.fromFile(_).getLines) ^ 6 warnings found one error found :core:compileScala FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:compileScala'. > Compilation failed {code} > It may need to import scala.io > -- > > Key: KAFKA-8127 > URL: https://issues.apache.org/jira/browse/KAFKA-8127 > Project: Kafka > Issue Type: Improvement >Reporter: JieFang.He >Priority: Major > > I get an error when compile kafka,which disappear when import scala.io > > {code:java} > core\src\main\scala\kafka\tools\StateChangeLogMerger.scala:140: object Source > is not a member of package io > val lineIterators = files.map(io.Source.fromFile(_).getLines) > ^ > 6 warnings found > one error found > :core:compileScala FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':core:compileScala'. > > Compilation failed > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then it ignores all embedded consumer layer status code since it should has a higher precedence. In addition, when create aggregated metric across streams instance (a.k.a at the app-level, which is usually what we would care and alert on), we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response c
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then it ignores all embedded consumer layer status code since it should has a higher precedence. In addition, when create aggregated metric across streams instance (a.k.a at the app-level, which is usually what we would care and alert on), we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. This is proposed on top of KAFKA-12352. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember*
[GitHub] [kafka] dengziming commented on pull request #10312: MINOR: Fix log statement whose placeholders are inconsistent with arguments
dengziming commented on pull request #10312: URL: https://github.com/apache/kafka/pull/10312#issuecomment-799873639 Similar to #10262, ping @tombentley to have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced
[ https://issues.apache.org/jira/browse/KAFKA-10518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302143#comment-17302143 ] Tom Lee commented on KAFKA-10518: - Attaching a contrived repro case. In case it's not obvious: in the event the repro case is run against a multi-broker cluster, key to the repro is that the partitions assigned to the consumer are being fetched from the same broker. Sample output here: [https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output |https://gist.githubusercontent.com/thomaslee/fa13c9a10466dc35792173c2485ad84b/raw/34c02bfc9f756eced8b952530b1b6378760fd7cd/bug-repro-output]Note the throughput drop from a ballpark ~2-3M records/sec to less than 200k/sec. This is the point at which the _disable_topic_2_ file is created and the producer stops writing to topic_2. Imagine a scenario where a consumer of topic_2 is downstream of another system producing to topic_2: if conditions are right, an incident impacting the producer could also impact the consumer. Same deal if the producer is decommed. > Consumer fetches could be inefficient when lags are unbalanced > -- > > Key: KAFKA-10518 > URL: https://issues.apache.org/jira/browse/KAFKA-10518 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Priority: Major > Attachments: kafka-slow-consumer-repro.tar.gz > > > Consumer fetches are inefficient when lags are imbalanced across partitions, > due to head of the line blocking and the behavior of blocking for > `max.wait.ms` until data is available. > When the consumer receives a fetch response, it prepares the next fetch > request and sends it out. The caveat is that the subsequent fetch request > would explicitly exclude partitions for which the consumer received data in > the previous round. This is to allow the consumer application to drain the > data for those partitions, until the consumer fetches the other partitions it > is subscribed to. > This behavior does not play out too well if the consumer is consuming when > the lag is unbalanced, because it would receive data for the partitions it is > lagging on, and then it would send a fetch request for partitions that do not > have any data (or have little data). The latter will end up blocking for > fetch.max.wait.ms on the broker before an empty response is sent back. This > slows down the consumer’s overall consumption throughput since > fetch.max.wait.ms is 500ms by default. -- This message was sent by Atlassian Jira (v8.3.4#803005)