[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523303#comment-16523303 ] Dong Lin commented on KAFKA-7012: - Great to know that the fix works! > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: Rajini Sivaram >Priority: Critical > Labels: regression > Fix For: 2.0.0, 1.0.2, 1.1.1 > > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523297#comment-16523297 ] Dong Lin commented on KAFKA-6949: - [~rsivaram] The patch has been merged into trunk. I opened PR [https://github.com/apache/kafka/pull/5293] to merge it into 2.0. Typically do we need review from another committer in order to cherry-pick patch from trunk into a release branch? Also, I added fix version 2.0.0 in this JIRA. Please feel free to change it as you see appropriate. Thanks. > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6949: Fix Version/s: 2.0.0 > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523295#comment-16523295 ] ASF GitHub Bot commented on KAFKA-6949: --- lindong28 opened a new pull request #5293: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica URL: https://github.com/apache/kafka/pull/5293 NoSuchElementException will be thrown if ReplicaAlterDirThread replaces the current replica with future replica right before the request handler thread executes `futureReplica.log.get.dir.getParent` in the ReplicaManager.alterReplicaLogDirs(). The solution is to grab the partition lock when request handler thread attempts to check the destination log directory of the future replica. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > alterReplicaLogDirs() should grab partition lock when accessing log of the > future replica > - > > Key: KAFKA-6949 > URL: https://issues.apache.org/jira/browse/KAFKA-6949 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dong Lin >Priority: Blocker > Fix For: 2.0.0 > > > I found this in a failed execution of > kafka.admin.ReassignPartitionsClusterTest.shouldExpandCluster. Looks like > we're missing some option checking. > {code} > [2018-05-25 08:03:53,310] ERROR [ReplicaManager broker=100] Error while > changing replica dir for partition my-topic-2 (kafka.server.ReplicaManager:76) > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) > at scala.None$.get(Option.scala:345) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:584) > at > kafka.server.ReplicaManager$$anonfun$alterReplicaLogDirs$1.apply(ReplicaManager.scala:576) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.alterReplicaLogDirs(ReplicaManager.scala:576) > at > kafka.server.KafkaApis.handleAlterReplicaLogDirsRequest(KafkaApis.scala:2037) > at kafka.server.KafkaApis.handle(KafkaApis.scala:138) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6949) alterReplicaLogDirs() should grab partition lock when accessing log of the future replica
[ https://issues.apache.org/jira/browse/KAFKA-6949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523292#comment-16523292 ] ASF GitHub Bot commented on KAFKA-6949: --- lindong28 closed pull request #5081: KAFKA-6949; alterReplicaLogDirs() should grab partition lock when accessing log of the future replica URL: https://github.com/apache/kafka/pull/5081 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9ab1ec47af8..b80c34475d3 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -149,10 +149,10 @@ class Partition(val topic: String, * @return true iff the future replica is created */ def maybeCreateFutureReplica(logDir: String): Boolean = { -// The readLock is needed to make sure that while the caller checks the log directory of the +// The writeLock is needed to make sure that while the caller checks the log directory of the // current replica and the existence of the future replica, no other thread can update the log directory of the // current replica or remove the future replica. -inReadLock(leaderIsrUpdateLock) { +inWriteLock(leaderIsrUpdateLock) { val currentReplica = getReplica().get if (currentReplica.log.get.dir.getParent == logDir) false @@ -207,29 +207,52 @@ class Partition(val topic: String, allReplicasMap.remove(replicaId) } - def removeFutureLocalReplica() { + def futureReplicaDirChanged(newDestinationDir: String): Boolean = { +inReadLock(leaderIsrUpdateLock) { + getReplica(Request.FutureLocalReplicaId) match { +case Some(futureReplica) => + if (futureReplica.log.get.dir.getParent != newDestinationDir) +true + else +false +case None => false + } +} + } + + def removeFutureLocalReplica(deleteFromLogDir: Boolean = true) { inWriteLock(leaderIsrUpdateLock) { allReplicasMap.remove(Request.FutureLocalReplicaId) + if (deleteFromLogDir) +logManager.asyncDelete(topicPartition, isFuture = true) } } - // Return true iff the future log has caught up with the current log for this partition + // Return true iff the future replica exists and it has caught up with the current replica for this partition // Only ReplicaAlterDirThread will call this method and ReplicaAlterDirThread should remove the partition // from its partitionStates if this method returns true def maybeReplaceCurrentWithFutureReplica(): Boolean = { val replica = getReplica().get -val futureReplica = getReplica(Request.FutureLocalReplicaId).get -if (replica.logEndOffset == futureReplica.logEndOffset) { +val futureReplicaLEO = getReplica(Request.FutureLocalReplicaId).map(_.logEndOffset) +if (futureReplicaLEO.contains(replica.logEndOffset)) { // The write lock is needed to make sure that while ReplicaAlterDirThread checks the LEO of the // current replica, no other thread can update LEO of the current replica via log truncation or log append operation. inWriteLock(leaderIsrUpdateLock) { -if (replica.logEndOffset == futureReplica.logEndOffset) { - logManager.replaceCurrentWithFutureLog(topicPartition) - replica.log = futureReplica.log - futureReplica.log = None - allReplicasMap.remove(Request.FutureLocalReplicaId) - true -} else false +getReplica(Request.FutureLocalReplicaId) match { + case Some(futureReplica) => +if (replica.logEndOffset == futureReplica.logEndOffset) { + logManager.replaceCurrentWithFutureLog(topicPartition) + replica.log = futureReplica.log + futureReplica.log = None + allReplicasMap.remove(Request.FutureLocalReplicaId) + true +} else false + case None => +// Future replica is removed by a non-ReplicaAlterLogDirsThread before this method is called +// In this case the partition should have been removed from state of the ReplicaAlterLogDirsThread +// Return false so that ReplicaAlterLogDirsThread does not have to remove this partition from the state again to avoid race condition +false +} } } else false } @@ -550,15 +573,22 @@ class Partition(val topic: String, } private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { - if (isFuture) - getReplicaOrException(Request.Futur
[jira] [Comment Edited] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523095#comment-16523095 ] Narayan Periwal edited comment on KAFKA-6681 at 6/26/18 4:02 AM: - [~steven.aerts], We are using RangeAssignor(which is the default), and not the Sticky Assignor which KAFKA-7026 mentions of. Some observation is that there is spike in the number of UnderReplicated partition, after which multiple consumer instances start consuming the same topic partition Our Kafka brokers and consumer both are in version 0.10.2.1 was (Author: nperiwal): [~steven.aerts], We are using RangeAssignor(which is the default), and not the Sticky Assignor which KAFKA-7026 mentions of. > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523146#comment-16523146 ] Narayan Periwal edited comment on KAFKA-7026 at 6/26/18 4:01 AM: - [~vahid], Can this issue be there with RangeAssignor as well, because we have seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with RangeAssignor. Jira - KAFKA-6681. Some observation is that there is spike in the number of UnderReplicated partition in our Kafka cluster, after which multiple consumer instances start consuming the same topic partition. Kafka broker is also at version 0.10.2.1 was (Author: nperiwal): [~vahid], Can this issue be there with RangeAssignor as well, because we have seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with RangeAssignor. Jira - KAFKA-6681. Some observation is that there is spike in the number of UnderReplicated partition, after which multiple consumer instances start consuming the same topic partition > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523146#comment-16523146 ] Narayan Periwal edited comment on KAFKA-7026 at 6/26/18 3:59 AM: - [~vahid], Can this issue be there with RangeAssignor as well, because we have seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with RangeAssignor. Jira - KAFKA-6681. Some observation is that there is spike in the number of UnderReplicated partition, after which multiple consumer instances start consuming the same topic partition was (Author: nperiwal): [~vahid], Can this issue be there with RangeAssignor as well, because we have seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with RangeAssignor. Jira - KAFKA-6681 > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523146#comment-16523146 ] Narayan Periwal commented on KAFKA-7026: [~vahid], Can this issue be there with RangeAssignor as well, because we have seen this issue occuring multiple time in our Kafka consumer (0.10.2.1) with RangeAssignor. Jira - KAFKA-6681 > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523095#comment-16523095 ] Narayan Periwal commented on KAFKA-6681: [~steven.aerts], We are using RangeAssignor(which is the default), and not the Sticky Assignor which KAFKA-7026 mentions of. > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7097) VerifiableProducer does not work properly with --message-create-time argument
[ https://issues.apache.org/jira/browse/KAFKA-7097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523074#comment-16523074 ] ASF GitHub Bot commented on KAFKA-7097: --- tedyu opened a new pull request #5292: KAFKA-7097 VerifiableProducer does not work properly with --message-create-time argument URL: https://github.com/apache/kafka/pull/5292 Currently create time is interpreted as integer. This PR makes the tool accept long values. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > VerifiableProducer does not work properly with --message-create-time argument > - > > Key: KAFKA-7097 > URL: https://issues.apache.org/jira/browse/KAFKA-7097 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Jasper Knulst >Priority: Major > > If you run: > > ./bin/kafka-verifiable-producer.sh --broker-list --topic > test_topic_increasing_p2 --message-create-time --acks -1 > --max-messages 100 > the "" for --message-create-time doesn't take a 13 digit long > like 1529656934000. > The error message: > verifiable-producer: error: argument --message-create-time: could not convert > '1529656934000' to Integer (For input string: "1529656934000") > > When you provide a 10 digit (1529656934) epoch for the argument it does work > but this leads to your topic being cleaned up in a few minutes since the > retention time is expired. > > The error seems to be obvious since VerifiableProducer.java has: > Long createTime = (long) res.getInt("createTime"); > when parsing the argument. This should be taken as a Long instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16523060#comment-16523060 ] ASF GitHub Bot commented on KAFKA-7026: --- vahidhashemian opened a new pull request #5291: KAFKA-7026: Sticky Assignor Partition Assignment Improvement URL: https://github.com/apache/kafka/pull/5291 In the current implementation of sticky assignor the leader does not cache the most recent calculated assignment. It relies on the fact that each consumer in the group sends its subscribed topics and also its current assignment when a rebalance occurs. This could lead to the issue described in [KAFKA-7026](https://issues.apache.org/jira/browse/KAFKA-7026), in which current assignment of a consumer is no longer valid and should be ignored. The solution implemented in this PR involves the leader caching the most recent assignment of each consumer, so the assignment reported by a consumer can be properly ignored if necessary. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7098) Improve accuracy of the log cleaner throttle rate
Dong Lin created KAFKA-7098: --- Summary: Improve accuracy of the log cleaner throttle rate Key: KAFKA-7098 URL: https://issues.apache.org/jira/browse/KAFKA-7098 Project: Kafka Issue Type: Improvement Reporter: Dong Lin Assignee: Dong Lin LogCleaner uses the Throttler class to throttler the log cleaning rate to the user-specified limit, i.e. log.cleaner.io.max.bytes.per.second. However, in Throttler.maybeThrottle(), the periodStartNs is set to the time before the sleep after the sleep() is called, which artificially increase the actual window size and under-estimate the actual log cleaning rate. This causes the log cleaning IO to be higher than the user-specified limit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522967#comment-16522967 ] Vahid Hashemian commented on KAFKA-7026: Thanks! Yes, this is a valid bug. Working on a PR and should submit one soon. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoid this duplication. > > Note: This issue was originally reported on > [StackOverflow|https://stackoverflow.com/questions/50761842/kafka-stickyassignor-breaking-delivery-to-single-consumer-in-the-group]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7089) Extend `kafka-consumer-groups.sh` to show "beginning offsets"
[ https://issues.apache.org/jira/browse/KAFKA-7089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522963#comment-16522963 ] Vahid Hashemian commented on KAFKA-7089: Thanks [~hachikuji]. I'll create a short KIP. > Extend `kafka-consumer-groups.sh` to show "beginning offsets" > - > > Key: KAFKA-7089 > URL: https://issues.apache.org/jira/browse/KAFKA-7089 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Matthias J. Sax >Assignee: Vahid Hashemian >Priority: Minor > > Currently, `kafka-consumer-groups.sh` only shows "current offset", "end > offset" and "lag". It would be helpful to extend the tool to also show > "beginning/earliest offset". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6098) Delete and Re-create topic operation could result in race condition
[ https://issues.apache.org/jira/browse/KAFKA-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522943#comment-16522943 ] Dhruvil Shah commented on KAFKA-6098: - I don't think we can provide any formal guarantees for these APIs if topics are being created and deleted concurrently. From the discussion so far, it looks like we want to be able to define the semantics for what happens in a single threaded application trying to delete, list, create topics, is this correct? One way to fix this problem could be to have deleteTopics return success only after the topic has been completely deleted (i.e. the topic znode has been deleted). listTopics could continue returning the topic information for this duration. Would this address the issue? > Delete and Re-create topic operation could result in race condition > --- > > Key: KAFKA-6098 > URL: https://issues.apache.org/jira/browse/KAFKA-6098 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Dhruvil Shah >Priority: Major > Labels: reliability > > Here is the following process to re-produce this issue: > 1. Delete a topic using the delete topic request. > 2. Confirm the topic is deleted using the list topics request. > 3. Create the topic using the create topic request. > In step 3) a race condition can happen that the response returns a > {{TOPIC_ALREADY_EXISTS}} error code, indicating the topic has already existed. > The root cause of the above issue is in the {{TopicDeletionManager}} class: > {code} > controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, > OfflinePartition) > controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, > NonExistentPartition) > topicsToBeDeleted -= topic > partitionsToBeDeleted.retain(_.topic != topic) > kafkaControllerZkUtils.deleteTopicZNode(topic) > kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic)) > kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic)) > controllerContext.removeTopic(topic) > {code} > I.e. it first update the broker's metadata cache through the ISR and metadata > update request, then delete the topic zk path, and then delete the > topic-deletion zk path. However, upon handling the create topic request, the > broker will simply try to write to the topic zk path directly. Hence there is > a race condition that between brokers update their metadata cache (hence list > topic request not returning this topic anymore) and zk path for the topic be > deleted (hence the create topic succeed). > The reason this problem could be exposed, is through current handling logic > of the create topic response, most of which takes {{TOPIC_ALREADY_EXISTS}} as > "OK" and moves on, and the zk path will be deleted later, hence leaving the > topic to be not created at all: > https://github.com/apache/kafka/blob/249e398bf84cdd475af6529e163e78486b43c570/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java#L221 > https://github.com/apache/kafka/blob/1a653c813c842c0b67f26fb119d7727e272cf834/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L232 > Looking at the code history, it seems this race condition always exist, but > testing on trunk / 1.0 with the above steps it is more likely to happen than > before. I wonder if the ZK async calls have an effect here. cc [~junrao] > [~onurkaraman] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6591) Move check for super user in SimpleAclProvider before ACL evaluation
[ https://issues.apache.org/jira/browse/KAFKA-6591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522905#comment-16522905 ] ASF GitHub Bot commented on KAFKA-6591: --- hachikuji closed pull request #4618: KAFKA-6591: Move super user check before ACL matching URL: https://github.com/apache/kafka/pull/4618 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 504d71ad63c..1dc3a1fb412 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -112,38 +112,46 @@ class SimpleAclAuthorizer extends Authorizer with Logging { val principal = session.principal val host = session.clientAddress.getHostAddress -val acls = getMatchingAcls(resource.resourceType, resource.name) - -// Check if there is any Deny acl match that would disallow this operation. -val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls) - -// Check if there are any Allow ACLs which would allow this operation. -// Allowing read, write, delete, or alter implies allowing describe. -// See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. -val allowOps = operation match { - case Describe => Set[Operation](Describe, Read, Write, Delete, Alter) - case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs) - case _ => Set[Operation](operation) + +def isEmptyAclAndAuthorized(acls: Set[Acl]): Boolean = { + if (acls.isEmpty) { +// No ACLs found for this resource, permission is determined by value of config allow.everyone.if.no.acl.found +authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") +shouldAllowEveryoneIfNoAclIsFound + } else false +} + +def denyAclExists(acls: Set[Acl]): Boolean = { + // Check if there are any Deny ACLs which would forbid this operation. + aclMatch(operation, resource, principal, host, Deny, acls) } -val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) -//we allow an operation if a user is a super user or if no acls are found and user has configured to allow all users -//when no acls are found or if no deny acls are found and at least one allow acls matches. -val authorized = isSuperUser(operation, resource, principal, host) || - isEmptyAclAndAuthorized(operation, resource, principal, host, acls) || - (!denyMatch && allowMatch) +def allowAclExists(acls: Set[Acl]): Boolean = { + // Check if there are any Allow ACLs which would allow this operation. + // Allowing read, write, delete, or alter implies allowing describe. + // See #{org.apache.kafka.common.acl.AclOperation} for more details about ACL inheritance. + val allowOps = operation match { +case Describe => Set[Operation](Describe, Read, Write, Delete, Alter) +case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs) +case _ => Set[Operation](operation) + } + allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) +} + +def aclsAllowAccess = { + //we allow an operation if no acls are found and user has configured to allow all users + //when no acls are found or if no deny acls are found and at least one allow acls matches. + val acls = getMatchingAcls(resource.resourceType, resource.name) + isEmptyAclAndAuthorized(acls) || (!denyAclExists(acls) && allowAclExists(acls)) +} + +// Evaluate if operation is allowed +val authorized = isSuperUser(operation, resource, principal, host) || aclsAllowAccess logAuditMessage(principal, authorized, operation, resource, host) authorized } - def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = { -if (acls.isEmpty) { - authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") - shouldAllowEveryoneIfNoAclIsFound -} else false - } - def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = { if (superUsers.contains(principal)) { authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.") ---
[jira] [Closed] (KAFKA-6978) Make Streams Window retention time strict
[ https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler closed KAFKA-6978. --- > Make Streams Window retention time strict > - > > Key: KAFKA-6978 > URL: https://issues.apache.org/jira/browse/KAFKA-6978 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > Currently, the configured retention time for windows is a lower bound. We > actually keep the window around until it's time to roll a new segment. At > that time, we drop all windows in the oldest segment. > As long as a window is still in a segment, we will continue to add > late-arriving records to it and also serve IQ queries from it. This is sort > of nice, because it makes optimistic use of the fact that the windows live > for some time after their retention expires. However, it is also a source of > (apparent) non-determinism, and it's arguably better for programability if we > adhere strictly to the configured constraints. > Therefore, the new behavior will be: > * once the retention time for a window passes, Streams will drop any > later-arriving records (with a warning log and a metric) > * likewise, IQ will first check whether the window is younger than its > retention time before answering queries. > No changes need to be made to the underlying segment management, this is > purely to make the behavior more strict wrt the configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522861#comment-16522861 ] rajadayalan perumalsamy commented on KAFKA-7012: Tried 1.1.2 version in our kafka cluster, performance looks good. Thank you all again!!! > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: Rajini Sivaram >Priority: Critical > Labels: regression > Fix For: 2.0.0, 1.0.2, 1.1.1 > > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6978) Make Streams Window retention time strict
[ https://issues.apache.org/jira/browse/KAFKA-6978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-6978. - Resolution: Fixed Fix Version/s: 2.1.0 This feature was merged in https://github.com/apache/kafka/commit/954be11bf2d3dc9fa11a69830d2ef5ff580ff533 > Make Streams Window retention time strict > - > > Key: KAFKA-6978 > URL: https://issues.apache.org/jira/browse/KAFKA-6978 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > Currently, the configured retention time for windows is a lower bound. We > actually keep the window around until it's time to roll a new segment. At > that time, we drop all windows in the oldest segment. > As long as a window is still in a segment, we will continue to add > late-arriving records to it and also serve IQ queries from it. This is sort > of nice, because it makes optimistic use of the fact that the windows live > for some time after their retention expires. However, it is also a source of > (apparent) non-determinism, and it's arguably better for programability if we > adhere strictly to the configured constraints. > Therefore, the new behavior will be: > * once the retention time for a window passes, Streams will drop any > later-arriving records (with a warning log and a metric) > * likewise, IQ will first check whether the window is younger than its > retention time before answering queries. > No changes need to be made to the underlying segment management, this is > purely to make the behavior more strict wrt the configuration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions
[ https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522768#comment-16522768 ] Mayuresh Gharat commented on KAFKA-7096: [~lindong] : [https://github.com/apache/kafka/pull/5289] > Consumer should drop the data for unassigned topic partitions > - > > Key: KAFKA-7096 > URL: https://issues.apache.org/jira/browse/KAFKA-7096 > Project: Kafka > Issue Type: Improvement >Reporter: Mayuresh Gharat >Assignee: Mayuresh Gharat >Priority: Major > > currently if a client has assigned topics : T1, T2, T3 and calls poll(), the > poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the > client unassigns some topics (for example T3) and calls poll() we still hold > the data (for T3) in the completedFetches queue until we actually reach the > buffered data for the unassigned Topics (T3 in our example) on subsequent > poll() calls, at which point we drop that data. This process of holding the > data is unnecessary. > When a client creates a topic, it takes time for the broker to fetch ACLs for > the topic. But during this time, the client will issue fetchRequest for the > topic, it will get response for the partitions of this topic. The response > consist of TopicAuthorizationException for each of the partitions. This > response for each partition is wrapped with a completedFetch and added to the > completedFetches queue. Now when the client calls the next poll() it sees the > TopicAuthorizationException from the first buffered CompletedFetch. At this > point the client chooses to sleep for 1.5 min as a backoff (as per the > design), hoping that the Broker fetches the ACL from ACL store in the > meantime. Actually the Broker has already fetched the ACL by this time. When > the client calls poll() after the sleep, it again sees the > TopicAuthorizationException from the second completedFetch and it sleeps > again. So it takes (1.5 * 60 * partitions) seconds before the client can see > any data. With this patch, the client when it sees the first > TopicAuthorizationException, it can all assign(EmptySet), which will get rid > of the buffered completedFetches (those with TopicAuthorizationException) and > it can again call assign(TopicPartitions) before calling poll(). With this > patch we found that client was able to get the records as soon as the Broker > fetched the ACLs from ACL store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7097) VerifiableProducer does not work properly with --message-create-time argument
Jasper Knulst created KAFKA-7097: Summary: VerifiableProducer does not work properly with --message-create-time argument Key: KAFKA-7097 URL: https://issues.apache.org/jira/browse/KAFKA-7097 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.0 Reporter: Jasper Knulst If you run: ./bin/kafka-verifiable-producer.sh --broker-list --topic test_topic_increasing_p2 --message-create-time --acks -1 --max-messages 100 the "" for --message-create-time doesn't take a 13 digit long like 1529656934000. The error message: verifiable-producer: error: argument --message-create-time: could not convert '1529656934000' to Integer (For input string: "1529656934000") When you provide a 10 digit (1529656934) epoch for the argument it does work but this leads to your topic being cleaned up in a few minutes since the retention time is expired. The error seems to be obvious since VerifiableProducer.java has: Long createTime = (long) res.getInt("createTime"); when parsing the argument. This should be taken as a Long instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518766#comment-16518766 ] Collin Scangarella edited comment on KAFKA-6817 at 6/25/18 6:21 PM: Thank you Matthias. In case anyone arrives with the same issue - we had to implement ValueTransformer and use it with the transformValues step as follows: `streamsBuilder.stream(...).transformValues(...).filter((k, v) -> v != null)...`. The reason why the transform is after the source (instead of before the sync) is because our state stores where also throwing the same UnknownProducerIdException. Additionally, we had to use transformValues instead of transform because the state store was unable to identify the correct serdes for the message. Finally, we had to filter out the null values as transformValues sends messages even if they are null. was (Author: col...@scangarella.com): Thank you Matthias. In case anyone arrives with the same issue - we had to implement ValueTransformer and us it with the transformValues step as follows `streamsBuilder.stream(...).transformValues(...).filter((k, v) -> v != null)...`. The reason why we had to put the transform after the source (instead of before the sync) is because our state stores where also throwing the same UnknownProducerIdException. Additionally, we had to use transformValues instead of transform because the state store was unable to identify the correct serdes for the message. Finally, we had to filter out the null values as transformValues sends messages even if they are null. > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.kafka.common.errors.UnknownProducerIdException > {code} > We discovered this error when we had the need to reprocess old messages. See > more details on > [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827] > We have reprod
[jira] [Created] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions
Mayuresh Gharat created KAFKA-7096: -- Summary: Consumer should drop the data for unassigned topic partitions Key: KAFKA-7096 URL: https://issues.apache.org/jira/browse/KAFKA-7096 Project: Kafka Issue Type: Improvement Reporter: Mayuresh Gharat Assignee: Mayuresh Gharat currently if a client has assigned topics : T1, T2, T3 and calls poll(), the poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the client unassigns some topics (for example T3) and calls poll() we still hold the data (for T3) in the completedFetches queue until we actually reach the buffered data for the unassigned Topics (T3 in our example) on subsequent poll() calls, at which point we drop that data. This process of holding the data is unnecessary. When a client creates a topic, it takes time for the broker to fetch ACLs for the topic. But during this time, the client will issue fetchRequest for the topic, it will get response for the partitions of this topic. The response consist of TopicAuthorizationException for each of the partitions. This response for each partition is wrapped with a completedFetch and added to the completedFetches queue. Now when the client calls the next poll() it sees the TopicAuthorizationException from the first buffered CompletedFetch. At this point the client chooses to sleep for 1.5 min as a backoff (as per the design), hoping that the Broker fetches the ACL from ACL store in the meantime. Actually the Broker has already fetched the ACL by this time. When the client calls poll() after the sleep, it again sees the TopicAuthorizationException from the second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) seconds before the client can see any data. With this patch, the client when it sees the first TopicAuthorizationException, it can all assign(EmptySet), which will get rid of the buffered completedFetches (those with TopicAuthorizationException) and it can again call assign(TopicPartitions) before calling poll(). With this patch we found that client was able to get the records as soon as the Broker fetched the ACLs from ACL store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7089) Extend `kafka-consumer-groups.sh` to show "beginning offsets"
[ https://issues.apache.org/jira/browse/KAFKA-7089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522556#comment-16522556 ] Jason Gustafson commented on KAFKA-7089: Yeah, I think we should have a short KIP. Seems noncontroversial though, so probably won't need much discussion. > Extend `kafka-consumer-groups.sh` to show "beginning offsets" > - > > Key: KAFKA-7089 > URL: https://issues.apache.org/jira/browse/KAFKA-7089 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Matthias J. Sax >Assignee: Vahid Hashemian >Priority: Minor > > Currently, `kafka-consumer-groups.sh` only shows "current offset", "end > offset" and "lag". It would be helpful to extend the tool to also show > "beginning/earliest offset". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7092) Multiple topics (including __consumer_offsets) have not been cleaned for a few months
[ https://issues.apache.org/jira/browse/KAFKA-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522545#comment-16522545 ] Jason Gustafson commented on KAFKA-7092: Yes, you are probably hitting one of those bugs. The most likely one is KAFKA-5413 which is fixed in all versions greater than 0.11. The fix for KAFKA-6264 will be in 2.0.0, which is in the process of being released. We will also be releasing 0.10.2.2 shortly, which will contain the fix for KAFKA-5413. > Multiple topics (including __consumer_offsets) have not been cleaned for a > few months > - > > Key: KAFKA-7092 > URL: https://issues.apache.org/jira/browse/KAFKA-7092 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.1 > Environment: linux > kafka 0.10.2.1 >Reporter: Noam Berman >Priority: Critical > > Hi, > We stumbled upon a scenario - our 8 node cluster (0.10.2.1) hasn't been > cleaning logs for many topics since 26/3/2018 (during which the brokers were > restarted). This includes __consumer_offsets, which has since grown to > enormouse proportions. > The cluster is an upgraded cluster from 0.10.1.0. > I believe this is a reproduction of either > https://issues.apache.org/jira/browse/KAFKA-5413 or > https://issues.apache.org/jira/browse/KAFKA-6264, but i haven't found any > workaround for this. > output of one of __consumer_offsets partition data folders: > {noformat} > user@kafka-broker0a:/var/lib/kafka/data/__consumer_offsets-100$ ls -ltr > total 132832 > -rw-r--r-- 1 kafka kafka 0 Mar 26 07:10 .log > -rw-r--r-- 1 kafka kafka 320648 Mar 26 07:11 004729570626.log > -rw-r--r-- 1 kafka kafka 2697871 Mar 26 07:11 006877054274.log > -rw-r--r-- 1 kafka kafka 3792861 Mar 26 07:11 006877066658.log > -rw-r--r-- 1 kafka kafka 931064 Mar 26 07:11 006877084831.log > -rw-r--r-- 1 kafka kafka 118282 Mar 26 07:11 006877089209.log > -rw-r--r-- 1 kafka kafka 1807647 Mar 26 07:11 006877089458.log > -rw-r--r-- 1 kafka kafka 239104 Mar 26 07:11 006877096136.log > -rw-r--r-- 1 kafka kafka 1835988 Mar 26 07:11 006877097398.log > -rw-r--r-- 1 kafka kafka 575040 Mar 26 07:11 006877104257.log > -rw-r--r-- 1 kafka kafka 390255 Mar 26 07:11 006877106774.log > -rw-r--r-- 1 kafka kafka 3105807 Mar 26 07:11 006877108341.log > -rw-r--r-- 1 kafka kafka 2086353 Mar 26 07:11 006877120354.log > -rw-r--r-- 1 kafka kafka 2434773 Mar 26 07:12 006877128121.log > -rw-r--r-- 1 kafka kafka 1432500 Mar 26 07:12 006877137370.log > -rw-r--r-- 1 kafka kafka 1901742 Mar 26 07:12 006877142384.log > -rw-r--r-- 1 kafka kafka 2310319 Mar 26 07:12 006877149921.log > -rw-r--r-- 1 kafka kafka 2084367 Mar 26 07:12 006877157735.log > -rw-r--r-- 1 kafka kafka 23317 Mar 26 07:12 006877165836.log > -rw-r--r-- 1 kafka kafka 3715029 Mar 26 07:12 006877165946.log > -rw-r--r-- 1 kafka kafka 2217586 Mar 26 07:12 006877179092.log > -rw-r--r-- 1 kafka kafka 1133491 Mar 26 07:12 006877187739.log > -rw-r--r-- 1 kafka kafka 1351694 Mar 26 07:12 006877191615.log > -rw-r--r-- 1 kafka kafka 1397573 Mar 26 19:11 006877195811.log > -rw-r--r-- 1 kafka kafka 1439494 Mar 28 00:37 006877201824.log > -rw-r--r-- 1 kafka kafka 1679347 Mar 29 07:10 006877207157.log > -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 .timeindex > -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 .index > -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877089458.timeindex > -rw-r--r-- 1 kafka kafka 8 Mar 29 09:35 006877089458.index > -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877089209.timeindex > -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 006877089209.index > -rw-r--r-- 1 kafka kafka 12 Mar 29 09:35 006877084831.timeindex > -rw-r--r-- 1 kafka kafka 0 Mar 29 09:35 006877084831.index > -rw-r--r-- 1 kafka kafka 36 Mar 29 09:35 006877066658.timeindex > -rw-r--r-- 1 kafka kafka 24 Mar 29 09:35 006877066658.index > . > . > -rw-r--r-- 1 kafka kafka 10485760 Jun 24 14:52 006877558794.index > -rw-r--r-- 1 kafka kafka 1235857 Jun 24 14:52 > 006877558794.log{noformat} > As you can see, the oldest log file has a 0..0 file name. > Is there any version that we can upgrade to that will solve this issue for > us? > > Thanks > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads
[ https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522514#comment-16522514 ] ASF GitHub Bot commented on KAFKA-6986: --- guozhangwang closed pull request #5210: KAFKA-6986: Export Admin Client metrics through Stream Threads URL: https://github.com/apache/kafka/pull/5210 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 0171b617e7c..75c93b63228 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; @@ -768,4 +770,11 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds) { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } + +/** + * Get the metrics kept by the adminClient + * + * @return + */ +public abstract Map metrics(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 495095a9276..016195308b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -2734,4 +2736,9 @@ void handleFailure(Throwable throwable) { return new DeleteConsumerGroupsResult(new HashMap>(futures)); } + +@Override +public Map metrics() { +return Collections.unmodifiableMap(this.metrics.metrics()); +} } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2fc7048b759..6fbdca265c1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -47,6 +49,8 @@ private Node controller; private int timeoutNextRequests = 0; +private Map mockMetrics = new HashMap<>(); + /** * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from * DEFAULT_CLUSTER_ID. @@ -391,4 +395,10 @@ public void close(long duration, TimeUnit unit) {} } } +public void setMockMetrics(MetricName name, Metric metric) { mockMetrics.put(name, metric); } + +@Override +public Map metrics() { +return mockMetrics; +} } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6a707ff986d..cef8116e880 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -382,12 +382,12 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState * * @return Map of all metrics. */ -// TODO: we can add metrics for admin client as well public Map metrics() { final Map result = new LinkedHashMap<>(); for (final StreamThread thread : threads) { result.putAll(thread.producerMetrics()); result.putAll(thread.consumerMetrics()); +result.putAll(thread.adminClientMetrics()); } if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics()); result.putAll(metrics.metr
[jira] [Commented] (KAFKA-6986) Export Admin Client metrics through Stream Threads
[ https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522515#comment-16522515 ] ASF GitHub Bot commented on KAFKA-6986: --- shunge opened a new pull request #5210: KAFKA-6986: Export Admin Client metrics through Stream Threads URL: https://github.com/apache/kafka/pull/5210 KAFKA-6986:Export Admin Client metrics through Stream Threads We already exported producer and consumer metrics through KafkaStreams class: https://github.com/apache/kafka/pull/4998 It makes sense to also export the Admin client metrics. I didn't add a separate unittest case for this. Let me know if it's needed. This is my first contribution, feel free to point out any mistakes that I did. @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Export Admin Client metrics through Stream Threads > -- > > Key: KAFKA-6986 > URL: https://issues.apache.org/jira/browse/KAFKA-6986 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Shun Guan >Priority: Minor > Labels: Newcomer, beginner, newbie > > We already exported producer and consumer metrics through KafkaStreams class: > [https://github.com/apache/kafka/pull/4998] > It makes sense to also export the Admin client metrics. > If any new contributor wishes to take over this one, please let me know. I > will revisit and close this ticket in one or two months later in case no one > claims it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
[ https://issues.apache.org/jira/browse/KAFKA-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522504#comment-16522504 ] Jason Gustafson commented on KAFKA-7095: See [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets] which will go into the next release. You shouldn't need to restart your application in order to consumer the next offset even if the committed offset is lost. What `auto.offset.reset` policy are you using? > Low traffic consumer is not consuming messages after the offsets is deleted > by Kafka > > > Key: KAFKA-7095 > URL: https://issues.apache.org/jira/browse/KAFKA-7095 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: Aldo Sinanaj >Priority: Minor > > Hello guys. > I have a low traffic consumers for a given consumer group and I have the > default broker setting for this property *offsets.retention.minutes*. So if a > messages is coming after 2 days and Kafka has deleted the offset for that > given consumer, then the consumer will not consume the new incoming messages. > If I restart the application it will consume from the earliest which is > obvious since the offset is deleted. > My question is why it doesn't consume the new messages if I don't restart the > application? And how does this version of Kafka understands if a consumer is > active or inactive? Is my consumer considered inactive in this case? > Thanks, > Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7095) Low traffic consumer is not consuming messages after the offsets is deleted by Kafka
Aldo Sinanaj created KAFKA-7095: --- Summary: Low traffic consumer is not consuming messages after the offsets is deleted by Kafka Key: KAFKA-7095 URL: https://issues.apache.org/jira/browse/KAFKA-7095 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.10.1.1 Reporter: Aldo Sinanaj Hello guys. I have a low traffic consumers for a given consumer group and I have the default broker setting for this property *offsets.retention.minutes*. So if a messages is coming after 2 days and Kafka has deleted the offset for that given consumer, then the consumer will not consume the new incoming messages. If I restart the application it will consume from the earliest which is obvious since the offset is deleted. My question is why it doesn't consume the new messages if I don't restart the application? And how does this version of Kafka understands if a consumer is active or inactive? Is my consumer considered inactive in this case? Thanks, Aldo -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522404#comment-16522404 ] Steven Aerts edited comment on KAFKA-7026 at 6/25/18 3:06 PM: -- I found three issues in kafka which are I think all the same. This issue, KAFKA-6681 and KAFKA-6717. I will comment on this one as I think it gives the best description. We were able to see this issue both on 0.11.0 as on 1.1.0. When we are in this state, the consumer group is marked as stable: {code:java} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group --describe --state --verbose COORDINATOR (ID)ASSIGNMENT-STRATEGY STATE#MEMBERS broker3:9092 (1003) stickyStable 6 {code} While the assignment is clearly broken: {code:java} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group wfei-aggregator-product-ap-1-2-PT6H --describe --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70 coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343) consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70 coll-v2-events-beta3(4,6,16,19,30,34,41,43,44,49,52,54,72,76,85,86,92,93,97,105,108,113,123,124,126,131,133,138,143,147,156,159,169,174,191,197,198,204,208,215,217,230,231,242,252,257,264,267,272,273,275,277,279,284,287,291,294,300,303,305,316,326,333,337,338,340,342,348,350,358) consumer-1-c4fd0a50-456a-4994-9c85-d843b8bc4319 /host2 consumer-1 70 coll-v2-events-beta3(1,3,5,7,13,18,22,23,24,32,33,35,36,37,40,55,68,74,77,84,87,94,98,102,122,127,135,137,141,142,148,152,154,157,158,165,177,178,193,194,199,220,221,222,228,236,238,239,249,259,262,270,281,283,285,293,295,299,301,314,320,329,331,341,344,345,346,347,352,353) consumer-1-517ab012-fa10-4d4e-9465-861f4912b013 /host3 consumer-1 70 coll-v2-events-beta3(2,8,9,14,17,26,28,45,46,48,53,57,58,59,60,63,66,70,73,75,78,80,82,88,91,95,103,107,109,115,119,120,134,136,151,155,160,166,170,172,182,183,186,188,189,203,205,226,232,233,237,244,246,247,248,260,263,278,282,286,292,296,308,312,313,319,328,332,343,357) consumer-2-e4e4ab60-e94f-4242-93fa-99aa39cafc9f /host2 consumer-2 70 coll-v2-events-beta3(0,10,11,15,20,25,27,29,31,38,47,50,79,83,96,99,101,111,112,114,125,139,140,145,150,162,168,171,175,179,187,190,192,195,200,202,206,207,209,211,213,214,218,219,223,227,229,234,240,241,245,250,251,253,254,256,261,269,271,276,288,290,298,315,323,325,334,349,354,355) consumer-2-8bbc1a53-a626-4e5b-825e-57d71dc4658c /host3 consumer-2 70 coll-v2-events-beta3(21,39,42,51,56,62,64,65,67,71,81,89,90,100,104,106,110,116,117,118,121,129,130,132,146,149,161,163,164,167,173,176,180,181,184,185,201,210,212,216,224,225,235,265,266,268,274,280,289,297,302,304,306,307,309,310,311,317,318,321,322,324,327,330,335,336,339,351,356,359) {code} So we have 360 partitions but there are 420 assigned. You clearly see that the partitions assigned to the fist consumer are also assigned to other consumers. (7, 17, 20, ...). This issue is typically triggered when consumers loose (temporarily) their connection with the broker. After restarting this consumer, everything is rebalanced correctly. was (Author: steven.aerts): I found three issues in kafka which are I think all the same. This issue, [KAFKA-6681] and [KAFKA-6717]. I will comment on this one as I think it gives the best description. We were able to see this issue both on 0.11.0 as on 1.1.0. When we are in this state, the consumer group is marked as stable: {code} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group --describe --state --verbose COORDINATOR (ID)ASSIGNMENT-STRATEGY STATE#MEMBERS broker3:9092 (1003) stickyStable 6 {code} While the assignment is clearly broken: {code} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group wfei-aggregator-product-ap-1-2-PT6H --describe --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70 coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343) consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70 coll-v2
[jira] [Commented] (KAFKA-7026) Sticky assignor could assign a partition to multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-7026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522404#comment-16522404 ] Steven Aerts commented on KAFKA-7026: - I found three issues in kafka which are I think all the same. This issue, [KAFKA-6681] and [KAFKA-6717]. I will comment on this one as I think it gives the best description. We were able to see this issue both on 0.11.0 as on 1.1.0. When we are in this state, the consumer group is marked as stable: {code} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group c-group --describe --state --verbose COORDINATOR (ID)ASSIGNMENT-STRATEGY STATE#MEMBERS broker3:9092 (1003) stickyStable 6 {code} While the assignment is clearly broken: {code} $./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group wfei-aggregator-product-ap-1-2-PT6H --describe --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer-1-63f5550e-fd12-4a1f-be13-fb33ac82d9d9 /host1 consumer-1 70 coll-v2-events-beta3(7,12,17,20,24,29,38,39,45,48,49,51,55,61,64,66,69,73,80,83,94,97,99,101,111,122,128,133,134,136,139,144,149,153,160,161,168,178,179,184,188,196,210,213,215,224,243,252,254,255,258,262,281,283,285,293,294,297,302,303,304,305,311,316,319,326,331,337,342,343) consumer-2-6490433c-f181-4d37-adb8-e3e8679bc960 /host1 consumer-2 70 coll-v2-events-beta3(4,6,16,19,30,34,41,43,44,49,52,54,72,76,85,86,92,93,97,105,108,113,123,124,126,131,133,138,143,147,156,159,169,174,191,197,198,204,208,215,217,230,231,242,252,257,264,267,272,273,275,277,279,284,287,291,294,300,303,305,316,326,333,337,338,340,342,348,350,358) consumer-1-c4fd0a50-456a-4994-9c85-d843b8bc4319 /host2 consumer-1 70 coll-v2-events-beta3(1,3,5,7,13,18,22,23,24,32,33,35,36,37,40,55,68,74,77,84,87,94,98,102,122,127,135,137,141,142,148,152,154,157,158,165,177,178,193,194,199,220,221,222,228,236,238,239,249,259,262,270,281,283,285,293,295,299,301,314,320,329,331,341,344,345,346,347,352,353) consumer-1-517ab012-fa10-4d4e-9465-861f4912b013 /host3 consumer-1 70 coll-v2-events-beta3(2,8,9,14,17,26,28,45,46,48,53,57,58,59,60,63,66,70,73,75,78,80,82,88,91,95,103,107,109,115,119,120,134,136,151,155,160,166,170,172,182,183,186,188,189,203,205,226,232,233,237,244,246,247,248,260,263,278,282,286,292,296,308,312,313,319,328,332,343,357) consumer-2-e4e4ab60-e94f-4242-93fa-99aa39cafc9f /host2 consumer-2 70 coll-v2-events-beta3(0,10,11,15,20,25,27,29,31,38,47,50,79,83,96,99,101,111,112,114,125,139,140,145,150,162,168,171,175,179,187,190,192,195,200,202,206,207,209,211,213,214,218,219,223,227,229,234,240,241,245,250,251,253,254,256,261,269,271,276,288,290,298,315,323,325,334,349,354,355) consumer-2-8bbc1a53-a626-4e5b-825e-57d71dc4658c /host3 consumer-2 70 coll-v2-events-beta3(21,39,42,51,56,62,64,65,67,71,81,89,90,100,104,106,110,116,117,118,121,129,130,132,146,149,161,163,164,167,173,176,180,181,184,185,201,210,212,216,224,225,235,265,266,268,274,280,289,297,302,304,306,307,309,310,311,317,318,321,322,324,327,330,335,336,339,351,356,359) {code} So we have 360 partitions but there are 420 assigned. You clearly see that the partitions assigned to the fist consumer are also assigned to other consumers. (7, 17, 20, ...). This issue is typically triggered when consumers loose (temporarily) their connection with the broker. > Sticky assignor could assign a partition to multiple consumers > -- > > Key: KAFKA-7026 > URL: https://issues.apache.org/jira/browse/KAFKA-7026 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Major > > In the following scenario sticky assignor assigns a topic partition to two > consumers in the group: > # Create a topic {{test}} with a single partition > # Start consumer {{c1}} in group {{sticky-group}} ({{c1}} becomes group > leader and gets {{test-0}}) > # Start consumer {{c2}} in group {{sticky-group}} ({{c1}} holds onto > {{test-0}}, {{c2}} does not get any partition) > # Pause {{c1}} (e.g. using Java debugger) ({{c2}} becomes leader and takes > over {{test-0}}, {{c1}} leaves the group) > # Resume {{c1}} > At this point both {{c1}} and {{c2}} will have {{test-0}} assigned to them. > > The reason is {{c1}} still has kept its previous assignment ({{test-0}}) from > the last assignment it received from the leader (itself) and did not get the > next round of assignments (when {{c2}} became leader) because it was paused. > Both {{c1}} and {{c2}} enter the rebalance supplying {{test-0}} as their > existing assignment. The sticky assignor code does not currently check and > avoi
[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances
[ https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522397#comment-16522397 ] Yuancheng PENG commented on KAFKA-6717: --- Hi [~steven.aerts], I confirm that it's the same issue as KAFKA-7026. I discovered this issue on production. Sorry for not being able to reproducing it. > TopicPartition Assined twice to a consumer group for 2 consumer instances > -- > > Key: KAFKA-6717 > URL: https://issues.apache.org/jira/browse/KAFKA-6717 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Yuancheng PENG >Priority: Major > > I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain > pattern. > There are 10 consumers with the same group id. > I expected that topic-partition to be assigned to only one consumer instance. > However some topic partitions are assigned twice in 2 different difference > instance, hence the consumer group process duplicate messages. > {code:java} > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > Collections.singletonList(StickyAssignor.class)); > KafkaConsumer c = new KafkaConsumer<>(props); > c.subscribe(Pattern.compile(TOPIC_PATTERN), new > NoOpConsumerRebalanceListener()); > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522373#comment-16522373 ] Steven Aerts commented on KAFKA-6681: - I think this problem is a duplicate of [KAFKA-7026]. > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances
[ https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522372#comment-16522372 ] Steven Aerts commented on KAFKA-6717: - I think this problem is a duplicate of [KAFKA-7026]. > TopicPartition Assined twice to a consumer group for 2 consumer instances > -- > > Key: KAFKA-6717 > URL: https://issues.apache.org/jira/browse/KAFKA-6717 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Yuancheng PENG >Priority: Major > > I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain > pattern. > There are 10 consumers with the same group id. > I expected that topic-partition to be assigned to only one consumer instance. > However some topic partitions are assigned twice in 2 different difference > instance, hence the consumer group process duplicate messages. > {code:java} > props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, > Collections.singletonList(StickyAssignor.class)); > KafkaConsumer c = new KafkaConsumer<>(props); > c.subscribe(Pattern.compile(TOPIC_PATTERN), new > NoOpConsumerRebalanceListener()); > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7094) Variate should unify code style in one method, and use camel name
[ https://issues.apache.org/jira/browse/KAFKA-7094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matt Wang updated KAFKA-7094: - Description: In one method, there are two variates, partitionsTobeLeader and partitionsToBeFollower, which should use unify code style, that will be helpful to code maintenance. https://github.com/apache/kafka/pull/5287 was: In one method, there are two variates, partitionsTobeLeader and partitionsToBeFollower, which should use unify code style, that will be helpful to code maintenance. > Variate should unify code style in one method, and use camel name > -- > > Key: KAFKA-7094 > URL: https://issues.apache.org/jira/browse/KAFKA-7094 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.1 >Reporter: Matt Wang >Priority: Trivial > > In one method, there are two variates, partitionsTobeLeader and > partitionsToBeFollower, which should use unify code style, that will be > helpful to code maintenance. > > https://github.com/apache/kafka/pull/5287 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7094) Variate should unify code style in one method, and use camel name
[ https://issues.apache.org/jira/browse/KAFKA-7094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522114#comment-16522114 ] ASF GitHub Bot commented on KAFKA-7094: --- wangzzu opened a new pull request #5287: KAFKA-7094: Variate should unify code style in one method, and use camel name URL: https://github.com/apache/kafka/pull/5287 In one method, there are two variates, partitionsTobeLeader and partitionsToBeFollower, which should use unify code style, that will be helpful to code maintenance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Variate should unify code style in one method, and use camel name > -- > > Key: KAFKA-7094 > URL: https://issues.apache.org/jira/browse/KAFKA-7094 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.1 >Reporter: Matt Wang >Priority: Trivial > > In one method, there are two variates, partitionsTobeLeader and > partitionsToBeFollower, which should use unify code style, that will be > helpful to code maintenance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7094) Variate should unify code style in one method, and use camel name
Matt Wang created KAFKA-7094: Summary: Variate should unify code style in one method, and use camel name Key: KAFKA-7094 URL: https://issues.apache.org/jira/browse/KAFKA-7094 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.1 Reporter: Matt Wang In one method, there are two variates, partitionsTobeLeader and partitionsToBeFollower, which should use unify code style, that will be helpful to code maintenance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522031#comment-16522031 ] Ted Yu commented on KAFKA-7088: --- Maybe set log level to DEBUG when you have a chance. Thanks > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code} > > I tried restarting application once but the situation repeated. Thread read > some data, committed offset and stopped processing, leaving that th
[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init
[ https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522004#comment-16522004 ] Lukasz Gluchowski commented on KAFKA-7088: -- [~yuzhih...@gmail.com] I turned TRACE logging on non production environment to try it out and it was filling the disk very quickly. Sorry but I can't turn it on on production. > Kafka streams thread waits infinitely on transaction init > - > > Key: KAFKA-7088 > URL: https://issues.apache.org/jira/browse/KAFKA-7088 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 > Environment: Linux 4.14.33-51.37.amzn1.x86_64 #1 SMP Thu May 3 > 20:07:43 UTC 2018 > kafka-streams (client) 1.0.1 > kafka broker 1.1.0 > Java version: > OpenJDK Runtime Environment (build 1.8.0_171-b10) > OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode) > kakfa config overrides: > num.stream.threads: 6 > session.timeout.ms: 1 > request.timeout.ms: 11000 > fetch.max.wait.ms: 500 > max.poll.records: 1000 > topic has 24 partitions >Reporter: Lukasz Gluchowski >Priority: Major > Labels: eos > > A kafka stream application thread stops processing without any feedback. The > topic has 24 partitions and I noticed that processing stopped only for some > partitions. I will describe what happened to partition:10. The application is > still running (now for about 8 hours) and that thread is hanging there and no > rebalancing that took place. > There is no error (we have a custom `Thread.UncaughtExceptionHandler` which > was not called). I noticed that after couple of minutes stream stopped > processing (at offset 32606948 where log-end-offset is 33472402). > Broker itself is not reporting any active consumer in that consumer group and > the only info I was able to gather was from thread dump: > {code:java} > "mp_ads_publisher_pro_madstorage-web-corotos-prod-9db804ae-2a7a-431f-be09-392ab38cd8a2-StreamThread-33" > #113 prio=5 os_prio=0 tid=0x7fe07c6349b0 nid=0xf7a waiting on condition > [0x7fe0215d4000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xfec6a2f8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.TransactionalRequestResult.await(TransactionalRequestResult.java:50) > at > org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:554) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:151) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) > at > org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744){code} > > I tried
[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521948#comment-16521948 ] Wouter Bancken commented on KAFKA-6817: --- Is there any news on a solution/workaround for clients not using Kafka Streams? We have tried setting transactional.id.expiration.ms to its maximum value (24 days) as mentioned earlier in the comments and this greatly reduces the number of occurrences but it does not eliminate the error. In our use cases it is possible that a producer will remain active for a long period without writing to a certain topic. > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.kafka.common.errors.UnknownProducerIdException > {code} > We discovered this error when we had the need to reprocess old messages. See > more details on > [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827] > We have reproduced the error with a smaller example application. The error > occurs after 10 minutes of producing messages that have old timestamps (type > 1 year old). The topic we are writing to has a retention.ms set to 1 year so > we are expecting the messages to stay there. > After digging through the ProducerStateManager-code in the Kafka source code > we have a theory of what might be wrong. > The ProducerStateManager.removeExpiredProducers() seems to remove producers > from memory erroneously when processing records which are older than the > maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` > configuration), which is set by default to 7 days. -- This message was sent by Atlassian JIRA (v7.6.3#76005)