[jira] [Updated] (KAFKA-17825) ByteBufferDeserializaer's array size can be inconsistent with the older version
[ https://issues.apache.org/jira/browse/KAFKA-17825?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-17825: --- Description: We've noticed that using the ByteBufferDeserializer can yield a different byte array length compare to the deserializer from 3.5.2. This is attributed by KIP-863, in particular, the old deserializer truncated the byte array starting from `buffer.position() + buffer.arrayOffset() + offset` using `Utils.toArray` Whereas the current implementation is a passthrough. This can be reproduced using the {code:java} KafkaConsumerProducerDemo.java{code} by changing the type to and perform a print after poll. For example, the producer produces a record [0, test0] (key is an int, "test0" is a 5 bytes long string, converted to byte buffer using {code:java} ByteBuffer.wrap(value.getBytes()){code} Prior to KIP-863 we see the following after polling the record from consumer: 3.5.2: test0 3.6.0: {code:java} ?$���y�NNޅ�-p�=�NAc�8D���8D��� test0{code} And if you analyze the ByteBuffer post 3.6.0, we can see the current offset is at 140 with array length of 149. [~LSK] - since you wrote the kip and did the implementation, can you address this ? was: We've noticed that using the ByteBufferDeserializer can yield a different byte array length compare to the deserializer from 3.5.2. This is attributed by KIP-863, in particular, the old deserializer truncated the byte array starting from `buffer.position() + buffer.arrayOffset() + offset` using `Utils.toArray` Whereas the current implementation is a passthrough. This can be reproduced using the {code:java} KafkaConsumerProducerDemo.java{code} by changing the type to and perform a print after poll. For example, the producer produces a record [0, test0] (key is an int, "test0" is a 5 bytes long string, converted to byte buffer using {code:java} ByteBuffer.wrap(value.getBytes()){code} Prior to KIP-863 we see the following after polling the record from consumer: 3.5.2: test0 3.6.0: {code:java} ?$���y�NNޅ�-p�=�NAc�8D���8D��� test0{code} And if you analyze the ByteBuffer post 3.6.0, we can see the current position is at 140 with array length of 149. [~LSK] - since you wrote the kip and did the implementation, can you address this ? > ByteBufferDeserializaer's array size can be inconsistent with the older > version > --- > > Key: KAFKA-17825 > URL: https://issues.apache.org/jira/browse/KAFKA-17825 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0, 3.7.1, 3.9.0, 3.8.1, > 3.9.1 >Reporter: Philip Nee >Assignee: LinShunkang >Priority: Blocker > > We've noticed that using the ByteBufferDeserializer can yield a different > byte array length compare to the deserializer from 3.5.2. This is attributed > by KIP-863, in particular, the old deserializer truncated the byte array > starting from > `buffer.position() + buffer.arrayOffset() + offset` using `Utils.toArray` > > Whereas the current implementation is a passthrough. > > This can be reproduced using the > {code:java} > KafkaConsumerProducerDemo.java{code} > by changing the type to and perform a print after poll. > For example, the producer produces a record [0, test0] (key is an int, > "test0" is a 5 bytes long string, converted to byte buffer using > {code:java} > ByteBuffer.wrap(value.getBytes()){code} > Prior to KIP-863 we see the following after polling the record from consumer: > 3.5.2: test0 > 3.6.0: > {code:java} > ?$���y�NNޅ�-p�=�NAc�8D���8D��� > test0{code} > > And if you analyze the ByteBuffer post 3.6.0, we can see the current offset > is at 140 with array length of 149. > > [~LSK] - since you wrote the kip and did the implementation, can you address > this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17825) ByteBufferDeserializaer's array size can be inconsistent with the older version
Philip Nee created KAFKA-17825: -- Summary: ByteBufferDeserializaer's array size can be inconsistent with the older version Key: KAFKA-17825 URL: https://issues.apache.org/jira/browse/KAFKA-17825 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.1, 3.8.0, 3.6.2, 3.6.1, 3.7.0, 3.6.0, 3.9.0, 3.8.1, 3.9.1 Reporter: Philip Nee Assignee: LinShunkang We've noticed that using the ByteBufferDeserializer can yield a different byte array length compare to the deserializer from 3.5.2. This is attributed by KIP-863, in particular, the old deserializer truncated the byte array starting from `buffer.position() + buffer.arrayOffset() + offset` using `Utils.toArray` Whereas the current implementation is a passthrough. This can be reproduced using the {code:java} KafkaConsumerProducerDemo.java{code} by changing the type to and perform a print after poll. For example, the producer produces a record [0, test0] (key is an int, "test0" is a 5 bytes long string, converted to byte buffer using {code:java} ByteBuffer.wrap(value.getBytes()){code} Prior to KIP-863 we see the following after polling the record from consumer: 3.5.2: test0 3.6.0: {code:java} ?$���y�NNޅ�-p�=�NAc�8D���8D��� test0{code} And if you analyze the ByteBuffer post 3.6.0, we can see the current position is at 140 with array length of 149. [~LSK] - since you wrote the kip and did the implementation, can you address this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
[ https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16819. Resolution: Fixed > CoordinatorRequestManager seems to return 0ms during the coordinator discovery > -- > > Key: KAFKA-16819 > URL: https://issues.apache.org/jira/browse/KAFKA-16819 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > > In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without > much backoff. The in-flight check PR fixed a lot of it; however, during the > coordinator discovery phase, CoordinatorRequestManager would keep on > returning 0 before the coordinator node was found. > > The impact is minor but we should be expecting the coordinator manager to > backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
[ https://issues.apache.org/jira/browse/KAFKA-16819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872996#comment-17872996 ] Philip Nee commented on KAFKA-16819: This was resolved via request state fixed. > CoordinatorRequestManager seems to return 0ms during the coordinator discovery > -- > > Key: KAFKA-16819 > URL: https://issues.apache.org/jira/browse/KAFKA-16819 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > > In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without > much backoff. The in-flight check PR fixed a lot of it; however, during the > coordinator discovery phase, CoordinatorRequestManager would keep on > returning 0 before the coordinator node was found. > > The impact is minor but we should be expecting the coordinator manager to > backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17188) LoginManager ctor might throw an exception causing login and loginCallbackHandler not being closed properly
[ https://issues.apache.org/jira/browse/KAFKA-17188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-17188: --- Summary: LoginManager ctor might throw an exception causing login and loginCallbackHandler not being closed properly (was: LoginManager ctro might throw an exception causing login and loginCallbackHandler not being closed properly) > LoginManager ctor might throw an exception causing login and > loginCallbackHandler not being closed properly > --- > > Key: KAFKA-17188 > URL: https://issues.apache.org/jira/browse/KAFKA-17188 > Project: Kafka > Issue Type: Bug > Components: security >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > When using MDS login, loginManager.login() may throw an exception causing > login and loginCallbackHandler not being closed properly. We should catch > exception and close login and loginCallbackHandler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17188) LoginManager ctro might throw an exception causing login and loginCallbackHandler not being closed properly
Philip Nee created KAFKA-17188: -- Summary: LoginManager ctro might throw an exception causing login and loginCallbackHandler not being closed properly Key: KAFKA-17188 URL: https://issues.apache.org/jira/browse/KAFKA-17188 Project: Kafka Issue Type: Bug Components: security Reporter: Philip Nee Assignee: Philip Nee When using MDS login, loginManager.login() may throw an exception causing login and loginCallbackHandler not being closed properly. We should catch exception and close login and loginCallbackHandler. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable
[ https://issues.apache.org/jira/browse/KAFKA-14648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-14648: -- Assignee: Brenden DeLuna (was: Philip Nee) > Do not fail clients if bootstrap servers is not immediately resolvable > -- > > Key: KAFKA-14648 > URL: https://issues.apache.org/jira/browse/KAFKA-14648 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Brenden DeLuna >Priority: Major > > In dynamic environments, such as system tests, there is sometimes a delay > between when a client is initialized and when the configured bootstrap > servers become available in DNS. Currently clients will fail immediately if > none of the bootstrap servers can resolve. It would be more convenient for > these environments to provide a grace period to give more time for > initialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-13421: --- Priority: Major (was: Blocker) > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Philip Nee >Priority: Major > Labels: clients, consumer, flaky-test, unit-test > Fix For: 3.8.0 > > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13421) Fix ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-13421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854404#comment-17854404 ] Philip Nee commented on KAFKA-13421: the test is disabled, can we make this non-blocker? > Fix > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > - > > Key: KAFKA-13421 > URL: https://issues.apache.org/jira/browse/KAFKA-13421 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Philip Nee >Priority: Major > Labels: clients, consumer, flaky-test, unit-test > Fix For: 3.8.0 > > > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > is failing with this error: > {code} > ConsumerBounceTest > testSubscribeWhenTopicUnavailable() PASSED > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > failed, log available in > /home/cmccabe/src/kafka9/core/build/reports/testOutput/kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup().test.stdout > > > ConsumerBounceTest > > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup() > FAILED > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode > = NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904) > > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96) > at kafka.server.KafkaServer.startup(KafkaServer.scala:320) > at > kafka.integration.KafkaServerTestHarness.$anonfun$restartDeadBrokers$2(KafkaServerTestHarness.scala:2 > 12) > at > scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889) > at > kafka.integration.KafkaServerTestHarness.restartDeadBrokers(KafkaServerTestHarness.scala:203) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsB > igGroup$1(ConsumerBounceTest.scala:327) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(C > onsumerBounceTest.scala:319) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16160. Resolution: Cannot Reproduce > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848748#comment-17848748 ] Philip Nee commented on KAFKA-16160: Hey [~phuctran] - I'm going to close this issue because I haven't been seeing this in test logs. Given that we fixed a few relevant issues like inflight logics... > AsyncKafkaConsumer is trying to connect to a disconnected node in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848703#comment-17848703 ] Philip Nee commented on KAFKA-15250: After the inflight check fix. I think most request managers are backing up correctly. Closing this issue. > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15250. Resolution: Fixed > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16819) CoordinatorRequestManager seems to return 0ms during the coordinator discovery
Philip Nee created KAFKA-16819: -- Summary: CoordinatorRequestManager seems to return 0ms during the coordinator discovery Key: KAFKA-16819 URL: https://issues.apache.org/jira/browse/KAFKA-16819 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee In KAFKA-15250 we discovered the ConsumerNetworkThread is looping without much backoff. The in-flight check PR fixed a lot of it; however, during the coordinator discovery phase, CoordinatorRequestManager would keep on returning 0 before the coordinator node was found. The impact is minor but we should be expecting the coordinator manager to backoff until the exp backoff expired (so it should return around 100ms). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848688#comment-17848688 ] Philip Nee commented on KAFKA-16687: Hey [~fortherightous] - I was wondering if what I saw is in line with your observation. It seems like there are a lot of FetchMetricsManager.partitionRecordLag sensors being allocated. But the memory increase wasn't as fast as you described. Sorry for the bother - trying to identify if this needs to be hotfixed for 3.8 release. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > Attachments: diff > > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16001) Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder
[ https://issues.apache.org/jira/browse/KAFKA-16001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16001: --- Description: We should: # Remove spy calls to the dependencies # Remove ConsumerNetworkThreadTest > Migrate ConsumerNetworkThreadTest away from ConsumerTestBuilder > --- > > Key: KAFKA-16001 > URL: https://issues.apache.org/jira/browse/KAFKA-16001 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lucas Brutschy >Assignee: Brenden DeLuna >Priority: Minor > Labels: consumer-threading-refactor, unit-tests > Fix For: 3.8.0 > > > We should: > # Remove spy calls to the dependencies > # Remove ConsumerNetworkThreadTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16799) NetworkClientDelegate is not backing off if the node is not found
Philip Nee created KAFKA-16799: -- Summary: NetworkClientDelegate is not backing off if the node is not found Key: KAFKA-16799 URL: https://issues.apache.org/jira/browse/KAFKA-16799 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee When performing stress testing, I found that AsycnKafkaConsumer's network client delegate isn't backing off if the node is not ready, causing a large number of: {code:java} 358 [2024-05-20 22:59:02,591] DEBUG [Consumer clientId=consumer.7136899e-0c20-4ccb-8ba3-497e9e683594-0, groupId=consumer-groups-test-5] Node is not ready, handle the request in the next event loop: node=b4-pkc-devcmkz697.us-west-2.aws.devel.cpd ev.cloud:9092 (id: 2147483643 rack: null), request=UnsentRequest{requestBuilder=ConsumerGroupHeartbeatRequestData(groupId='consumer-groups-test-5', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=10, subscri bedTopicNames=[_kengine-565-test-topic8081], serverAssignor=null, topicPartitions=[]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@139a8761, node=Optional[b4-pkc-devcmkz697.us-west-2.aws .devel.cpdev.cloud:9092 (id: 2147483643 rack: null)], timer=org.apache.kafka.common.utils.Timer@649fffad} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate:169) {code} show up in the log. What should have happened is: 1. node is not ready 2. exponential back off 3. retry -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16778) AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition
Philip Nee created KAFKA-16778: -- Summary: AsyncKafkaConsumer fetcher might occasionally try to fetch to a revoked partition Key: KAFKA-16778 URL: https://issues.apache.org/jira/browse/KAFKA-16778 Project: Kafka Issue Type: Bug Reporter: Philip Nee {code:java} java.lang.IllegalStateException: No current assignment for partition output-topic-26 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) at org.apache.kafka.clients.consumer.internals.SubscriptionState.position(SubscriptionState.java:542) at org.apache.kafka.clients.consumer.internals.AbstractFetch.prepareFetchRequests(AbstractFetch.java:411) at org.apache.kafka.clients.consumer.internals.FetchRequestManager.poll(FetchRequestManager.java:74) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$new$2(ConsumerNetworkThread.java:159) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:143) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:145) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:94) {code} The setup is - running 30 consumers consuming from a 300 partitions topic. We can occasionally get an IllegalStateException from the consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846107#comment-17846107 ] Philip Nee commented on KAFKA-16687: Also, by looking at the heap. It seems like we've been getting leak earlier than 3.6 - I see the sensor class just keeps on getting more allocations. i.e. allocated objects == live object. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > Attachments: diff > > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16687: --- Attachment: diff > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > Attachments: diff > > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846042#comment-17846042 ] Philip Nee edited comment on KAFKA-16687 at 5/13/24 6:51 PM: - Hey [~fortherightous] - I did a memory profile and found there's a constant increase some sensors in FetchMetricsManager. Is this inline with what you are seeing? I actually don't see much different in unsafe allocation. Could you post a snapshot of your jcmd diff here? I attach my jcmd diff here. was (Author: JIRAUSER283568): Hey [~fortherightous] - I did a memory profile and found there's a constant increase some sensors in FetchMetricsManager. Is this inline with what you are seeing? I actually don't see much different in unsafe allocation. Could you post a snapshot of your jcmd diff here? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > Attachments: diff > > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846042#comment-17846042 ] Philip Nee commented on KAFKA-16687: Hey [~fortherightous] - I did a memory profile and found there's a constant increase some sensors in FetchMetricsManager. Is this inline with what you are seeing? I actually don't see much different in unsafe allocation. Could you post a snapshot of your jcmd diff here? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687 ] Philip Nee deleted comment on KAFKA-16687: was (Author: JIRAUSER283568): [~fortherightous] - Not sure if this is what you are seeing: {code:java} Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB) 55 (malloc=458KB +234KB #6808 +3631) 56 (tracking overhead=3376KB +247KB) {code} and {code:java} 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 291 [0x000147f01034] 292 (malloc=43KB type=Other +43KB #4 +4) 293 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 295 [0x00014753d8ac] 296 (malloc=996KB type=Other -1249KB #94 -102) {code} > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845464#comment-17845464 ] Philip Nee commented on KAFKA-16687: [~fortherightous] - Not sure if this is what you are seeing: {code:java} Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB) 55 (malloc=458KB +234KB #6808 +3631) 56 (tracking overhead=3376KB +247KB) {code} and {code:java} 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 291 [0x000147f01034] 292 (malloc=43KB type=Other +43KB #4 +4) 293 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 295 [0x00014753d8ac] 296 (malloc=996KB type=Other -1249KB #94 -102) {code} > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845421#comment-17845421 ] Philip Nee commented on KAFKA-16687: "Looks like it's caused by this" - mind elaborate? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-7300) Add KafkaConsumer fetch-error-rate and fetch-error-total metrics
[ https://issues.apache.org/jira/browse/KAFKA-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-7300: - Assignee: Philip Nee (was: Kevin Lu) > Add KafkaConsumer fetch-error-rate and fetch-error-total metrics > - > > Key: KAFKA-7300 > URL: https://issues.apache.org/jira/browse/KAFKA-7300 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer, metrics >Reporter: Kevin Lu >Assignee: Philip Nee >Priority: Minor > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+KafkaConsumer+fetch-error-rate+and+fetch-error-total+metrics] > > The KafkaConsumer is a complex client that requires many different components > to function properly. When a consumer is not operating properly, it can be > difficult to identify the root cause and which component is causing issues > (ConsumerCoordinator, Fetcher, ConsumerNetworkClient, etc). > > This aims to improve the monitoring and detection of KafkaConsumer’s Fetcher > component. > > Fetcher will send a fetch request for each node that the consumer has > assigned partitions for. > > This fetch request may fail under the following cases: > * Intermittent network issues (goes to onFailure) > * Node sent an invalid full/incremental fetch response > (FetchSessionHandler’s handleResponse returns false) > * FetchSessionIdNotFound > * InvalidFetchSessionEpochException > > These cases are logged, but it would be valuable to provide a corresponding > metric that allows for monitoring and alerting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845045#comment-17845045 ] Philip Nee commented on KAFKA-16687: Since you mentioned `disabled JMX reporter` would resolve the issue. I was wondering if this is caused by leaking network connections. I have been wondering if it was caused by the sticky node used by the telemetry sender: [https://github.com/apache/kafka/commit/2b99d0e45027c88ae2347fa8f7d1ff4b2b919089] > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845042#comment-17845042 ] Philip Nee commented on KAFKA-16687: Thank you. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844742#comment-17844742 ] Philip Nee commented on KAFKA-16687: Hey @FTR - I wrote a simple consume-produce application and probed it using jcmd. I didn't find much useful info. However, I observed the application's memory footprint increased quite fast (in GB). How did you determine " found that it's caused by [NMT Internal] memory, which created by Unsafe_allocatememory xxx." ? Thanks. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844400#comment-17844400 ] Philip Nee commented on KAFKA-16687: thanks for the information this is massively helpful. how do you stress test your application? Could you add a bit of information here? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844395#comment-17844395 ] Philip Nee commented on KAFKA-16687: Your application uses KafkaConsumer only right? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844387#comment-17844387 ] Philip Nee commented on KAFKA-16687: But this is native memory leak so I suspect it's a different issue than heap leak. would you mind experimenting with 3.6 ? We didn't see the leak using 3.6. > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16687: -- Assignee: Philip Nee > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844379#comment-17844379 ] Philip Nee commented on KAFKA-16687: Hey [~fortherightous] - we also observed some (possible) memory leak. The symptom is particularly obvious when working with large number of partitions. Have you tried kafka client version 3.6 or earlier? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16474. Resolution: Fixed Ran the test several times, the client log also looks fine. > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843326#comment-17843326 ] Philip Nee commented on KAFKA-16474: Yeah I think we still need to figure why network thread isn't backing off for most of the polls. > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843320#comment-17843320 ] Philip Nee commented on KAFKA-16474: Hey [~lianetm] - i'm planning to verify the fix and close out the issue. > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16474: -- Assignee: Philip Nee (was: Lianet Magrans) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16022) AsyncKafkaConsumer sometimes complains “No current assignment for partition {}”
[ https://issues.apache.org/jira/browse/KAFKA-16022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842730#comment-17842730 ] Philip Nee commented on KAFKA-16022: hi [~phuctran] - I believe this came up during integration testing. You can try to see if the fetch request manager test also emits this error. > AsyncKafkaConsumer sometimes complains “No current assignment for partition > {}” > --- > > Key: KAFKA-16022 > URL: https://issues.apache.org/jira/browse/KAFKA-16022 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This seems to be a timing issue that before the member receives any > assignment from the coordinator, the fetcher will try to find the current > position causing "No current assignment for partition {}". This creates a > small amount of noise to the log. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13447) Consumer should not reuse committed offset after topic recreation
[ https://issues.apache.org/jira/browse/KAFKA-13447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-13447: -- Assignee: Philip Nee > Consumer should not reuse committed offset after topic recreation > - > > Key: KAFKA-13447 > URL: https://issues.apache.org/jira/browse/KAFKA-13447 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Philip Nee >Priority: Major > Labels: consumer, needs-kip > > KAFKA-12257 fixes an issue in which the consumer is unable to make progress > after a topic has been recreated. The problem was that the client could not > distinguish between stale metadata with a lower leader epoch and a recreated > topic with a lower leader epoch. With TopicId support in KIP-516, the client > is able to tell when a topic has been recreated since the new topic will have > a different ID. > However, what the patch did not fix is the potential reuse of the current > offset position on the recreated topic. For example, say that the consumer is > at offset N when the topic gets recreated. Currently, the consumer will > continue fetching from offset N after detecting the recreation. The most > likely result of this is either an offset out of range error or a log > truncation error, but it is also possible for the offset position to remain > valid on the recreated topic (say for a low-volume topic where the offsets is > already low, or a case where the consumer was down for a while). > To fix this issue completely, we need to store the topicId along with the > committed offset in __consumer_offsets. This would allow the consumer to > detect when the offset is no longer relevant for the current topic. We also > need to decide how to raise this case to the user. If the user has enabled > automatic offset reset, we can probably use that. Otherwise, we might need a > new exception type to signal the user that the position needs to be reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16110: --- Issue Type: Task (was: New Feature) > Document and publicize performance test results for AsyncKafkaConsumer > -- > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16110) Document and publicize performance test results for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16110: --- Summary: Document and publicize performance test results for AsyncKafkaConsumer (was: Implement consumer performance tests) > Document and publicize performance test results for AsyncKafkaConsumer > -- > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, performance-benchmark > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842187#comment-17842187 ] Philip Nee edited comment on KAFKA-16639 at 4/29/24 11:55 PM: -- hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. Thanks. [~chia7712] was (Author: JIRAUSER283568): hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. THanks. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842187#comment-17842187 ] Philip Nee commented on KAFKA-16639: hey - sorry. I took this because I wasn't sure if you wanted me to look into it. Please go ahead assign it back. We would love to review it. THanks. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16639: -- Assignee: Philip Nee (was: Chia-Ping Tsai) > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841678#comment-17841678 ] Philip Nee commented on KAFKA-16639: thanks for reporting. will do. > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Labels: kip-848-client-support > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16579) Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16579. Resolution: Fixed > Revert changes to consumer_rolling_upgrade_test.py for the new async Consumer > - > > Key: KAFKA-16579 > URL: https://issues.apache.org/jira/browse/KAFKA-16579 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16271 updated the system tests in {{consumer_rolling_upgrade_test.py}} > so it could test the new consumer. However, the test is tailored specifically > to the "old" Consumer's protocol and assignment strategy upgrade. > Unsurprisingly, when we run those system tests with the new > {{AsyncKafkaConsumer}}, we get errors like the following: > {code} > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 29.634 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})}") > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py", > line 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 77, in rolling_update_test > self._verify_range_assignment(consumer) > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 41, in _verify_range_assignment > "Mismatched assignment: %s" % assignment > AssertionError: Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1)})} > {code} > The task here is to revert the changes made in KAFKA-16271. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16389. Resolution: Fixed > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15250) ConsumerNetworkThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837836#comment-17837836 ] Philip Nee commented on KAFKA-15250: Added a testing branch under my fork: network-thread-mbean-metrics I ran the testAsyncCommit in the integration test (because it actually polls). These are the stats: backoff time avg:0.29459381324082107 (backoff time max,92.0) poll time avg:9.2161365261735107E18 (poll time max,9.223372036854776E18) The loop doesn't really backoff. > ConsumerNetworkThread is running tight loop > --- > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, events > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837342#comment-17837342 ] Philip Nee commented on KAFKA-16474: The log was attached: See line 7481 and 7492 in : {code:java} AssignmentValidationTest/test_valid_assignment/metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=uniform/2/VerifiableConsumer-0-281473320420544/ducker11/verifiable_consumer.log {code} {code:java} 7480 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests (org.apache.kafka.clients.NetworkClient) 7481 [2024-04-15 16:03:35,964] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Sending CONSUMER_GROUP_HEARTBEAT request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_gro up_id-1, correlationId=108, headerVersion=2) and timeout 3 to node 2147483646: ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTop icNames=[test_topic], serverAssignor='uniform', topicPartitions=[]) (org.apache.kafka.clients.NetworkClient) 7482 [2024-04-15 16:03:35,964] TRACE For telemetry state SUBSCRIPTION_NEEDED, returning the value 299843 ms; (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) 7483 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Starting processing of 1 event (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7484 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Processing event: ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=nMQf3YgtRU6vaJu-oVpiQg, future=java.util.concurrent.CompletableFuture@5 7bc27f5[Not completed, 1 dependents], deadlineMs=9223372036854775807} (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7485 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Completed processing (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7486 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] 999 ms remain before another request should be sent for RequestState{owner='org.apache.kafka.clients.consumer.internals.HeartbeatRequestMana ger$HeartbeatRequestState', exponentialBackoff=ExponentialBackoff{multiplier=2, expMax=3.3219280948873626, initialInterval=100, jitter=30.0}, lastSentMs=1713197015963, lastReceivedMs=1713197015963, numAttempts=1, backoffMs=1000} (org.ap ache.kafka.clients.consumer.internals.RequestState) 7487 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Polling for fetches with timeout 0 (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer) 7488 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests (org.apache.kafka.clients.NetworkClient) 7489 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: PollEvent{type=POLL, id=T9IwUDeHQoGrnqJJIyzb8Q, pollTimeMs=1713197015964} (org.apache.kafka.clients.consumer.internals.event s.ApplicationEventHandler) 7490 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] No events to process (org.apache.kafka.clients.consumer.internals.events.EventProcessor) 7491 [2024-04-15 16:03:35,964] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=otKgqakkS9COd01SkJ3btQ, future=java.util.concurrent.CompletableFuture@5fb 759d6[Not completed], deadlineMs=9223372036854775807} (org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler) 7492 [2024-04-15 16:03:35,964] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Sending CONSUMER_GROUP_HEARTBEAT request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_gro up_id-1, correlationId=109, headerVersion=2) and timeout 3 to node 2147483646: ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTop icNames=[test_topic], serverAssignor='uniform', topicPartitions=[]) (org.apache.kafka.clients.NetworkClient) {code} > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > -
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16474: --- Attachment: failing_results.zip > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > Attachments: failing_results.zip > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16474: --- Summary: AsyncKafkaConsumer might send out heartbeat request without waiting for its response (was: AsyncKafkaConsumer might rapidly send out successive heartbeat without waiting for the response the come back) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat without waiting for the response the come back
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16474: --- Summary: AsyncKafkaConsumer might rapidly send out successive heartbeat without waiting for the response the come back (was: AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked) > AsyncKafkaConsumer might rapidly send out successive heartbeat without > waiting for the response the come back > - > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16474) AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked
Philip Nee created KAFKA-16474: -- Summary: AsyncKafkaConsumer might rapidly send out successive heartbeat causing partitions getting revoked Key: KAFKA-16474 URL: https://issues.apache.org/jira/browse/KAFKA-16474 Project: Kafka Issue Type: Bug Components: consumer Reporter: Philip Nee Assignee: Philip Nee KAFKA-16389 We've discovered that in some uncommon cases, the consumer could send out successive heartbeats without waiting for the response to come back. this might result in causing the consumer to revoke its just assigned assignments in some cases. For example: The consumer first sends out a heartbeat with epoch=0 and memberId='' The consumer then rapidly sends out another heartbeat with epoch=0 and memberId='' because it has not gotten any response and thus not updating its local state The consumer receives assignments from the first heartbeat and reconciles its assignment. Since the second heartbeat has epoch=0 and memberId='', the server will think this is a new member joining and therefore send out an empty assignment. The consumer receives the response from the second heartbeat. Revoke all of its partitions. There are 2 issues associate with this bug: # inflight logic # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833744#comment-17833744 ] Philip Nee commented on KAFKA-16389: [^consumer.log] It seems like the consumer receives empty topicPartitions after the assignment. One suspicious thing that I see is that there's no send in between the successive Receives. I wonder if this is a race condition: First: Assignments received {code:java} 7997 [2024-04-03 19:48:49,445] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Received CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_group_id-1, correlationId=35, headerVersion=2): ConsumerGroupHeartbeatResponseData(t hrottleTimeMs=0, errorCode=0, errorMessage=null, memberId='pBC-jWhKQ7yr0y9MXysT2g', memberEpoch=1, heartbeatIntervalMs=5000, assignment=Assignment(topicPartitions=[TopicPartitions(topicId=TcUeldqLQae7xsWQo2WjPA, partitions=[0, 1, 2, 3, 4, 5])])) (org.apache.kafka.clients.NetworkClient) 7999 [2024-04-03 19:48:49,450] TRACE [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Member pBC-jWhKQ7yr0y9MXysT2g with epoch 1 transitioned from JOINING to RECONCILING. (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl){code} Assignments completed: {code:java} 8005 [2024-04-03 19:48:49,454] INFO [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Updating assignment with local epoch 0 8006 Assigned partitions: [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8007 Current owned partitions: [] 8008 Added partitions (assigned - owned): [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8009 Revoked partitions (owned - assigned): [] 8010 (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code} Then receive another heartbeat: {code:java} 8021 [2024-04-03 19:48:49,486] DEBUG [Consumer clientId=consumer-test_group_id-1, groupId=test_group_id] Received CONSUMER_GROUP_HEARTBEAT response from node 2147483646 for request with header RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, clientId=consumer-test_group_id-1, correlationId=36, headerVersion=2): ConsumerGroupHeartbeatResponseData(t hrottleTimeMs=0, errorCode=0, errorMessage=null, memberId='HhILLGoPQ3i7Rt6IINJbRA', memberEpoch=2, heartbeatIntervalMs=5000, assignment=Assignment(topicPartitions=[])) (org.apache.kafka.clients.NetworkClient) Which causes revocation Updating assignment with local epoch 1 8223 Assigned partitions: [] 8224 Current owned partitions: [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8225 Added partitions (assigned - owned): [] 8226 Revoked partitions (owned - assigned): [test_topic-0, test_topic-1, test_topic-2, test_topic-3, test_topic-4, test_topic-5] 8227 (org.apache.kafka.clients.consumer.internals.MembershipManagerImpl) {code} > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception >
[jira] [Updated] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16389: --- Attachment: consumer.log > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch, consumer.log > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17833704#comment-17833704 ] Philip Nee commented on KAFKA-16389: Hi [~kirktrue] Thanks for the initial investigation. I think your approach makes sense but I do think we need to rewrite the verifiable_consumer.py's event handler. As the states transition doesn't necessary match the behavior of the current consumer. And I think that's why there's still some flakiness in the patch you submitted. See my notes below: I'm still occasionally getting errors like: "ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])]" This seems to be caused by some weird reconciliation state. For example: Here We can see consumer1 got assigned 6 partitions and then immediately gave up all of them. It is unclear why onPartitionsRevoke is triggered. {code:java} 1 node wait for member idx 1 partiton assigned [{'topic': 'test_topic', 'partition': 0}, {'topic': 'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, {'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 4}, {'topic': 'test_topic', 'partition': 5}] idx 1 partiton revoked [{'topic': 'test_topic', 'partition': 0}, {'topic': 'test_topic', 'partition': 1}, {'topic': 'test_topic', 'partition': 2}, {'topic': 'test_topic', 'partition': 3}, {'topic': 'test_topic', 'partition': 4}, {'topic': 'test_topic', 'partition': 5}] node: ducker11 Current assignment: {: []} idx 1 partiton assigned [] [WARNING - 2024-04-03 11:05:34,587 - service_registry - stop_all - lineno:53]: Error stopping service : [WARNING - 2024-04-03 11:06:09,128 - service_registry - clean_all - lineno:67]: Error cleaning service : [INFO:2024-04-03 11:06:09,134]: RunnerClient: kafkatest.tests.client.consumer_test.AssignmentValidationTest.test_valid_assignment.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=range: FAIL: TimeoutError("expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])]") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line 583, in test_valid_assignment wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()), File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when num_started 1: [('ducker@ducker11', [])] {code} > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions w
[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout
[ https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16433: --- Labels: consumer-threading-refactor (was: ) > beginningOffsets and offsetsForTimes don't behave consistently when providing > a zero timeout > > > Key: KAFKA-16433 > URL: https://issues.apache.org/jira/browse/KAFKA-16433 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor > > As documented here:[https://github.com/apache/kafka/pull/15525] > > Both API should at least send out a request when zero timeout is provided. > > This is corrected in the PR above. We however still to fix the > implementation for offsetsForTimes API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout
Philip Nee created KAFKA-16433: -- Summary: beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout Key: KAFKA-16433 URL: https://issues.apache.org/jira/browse/KAFKA-16433 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee As documented here:[https://github.com/apache/kafka/pull/15525] Both API should at least send out a request when zero timeout is provided. This is corrected in the PR above. We however still to fix the implementation for offsetsForTimes API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831016#comment-17831016 ] Philip Nee edited comment on KAFKA-16389 at 3/26/24 5:27 PM: - Hey [~chia7712] - We are currently setting up the system tests using the AsyncKafkaConsumer so that we can get system test reports to address the shortcoming of the async consumer. Our goal is to bring the async consumer as close to the current one as possible. Understand the concern about the broken tests, but this consumer is also not being released yet... was (Author: JIRAUSER283568): Hey [~chia7712] - We are currently setting up the system tests using the AsyncKafkaConsumer so that we can get system test reports to address the shortcoming of the async consumer. Our goal is to bring the async consumer as close to the current one as possible. > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831016#comment-17831016 ] Philip Nee commented on KAFKA-16389: Hey [~chia7712] - We are currently setting up the system tests using the AsyncKafkaConsumer so that we can get system test reports to address the shortcoming of the async consumer. Our goal is to bring the async consumer as close to the current one as possible. > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16389) consumer_test.py’s test_valid_assignment fails with new consumer
[ https://issues.apache.org/jira/browse/KAFKA-16389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16389: -- Assignee: Philip Nee > consumer_test.py’s test_valid_assignment fails with new consumer > > > Key: KAFKA-16389 > URL: https://issues.apache.org/jira/browse/KAFKA-16389 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Philip Nee >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > Attachments: KAFKA-16389.patch > > > The following error is reported when running the {{test_valid_assignment}} > test from {{consumer_test.py}}: > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_test.py", line > 584, in test_valid_assignment > wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, > consumer.current_assignment()), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: expected valid assignments of 6 partitions when > num_started 2: [('ducker@ducker05', []), ('ducker@ducker06', [])] > {code} > To reproduce, create a system test suite file named > {{test_valid_assignment.yml}} with these contents: > {code:yaml} > failures: > - > 'kafkatest/tests/client/consumer_test.py::AssignmentValidationTest.test_valid_assignment@{"metadata_quorum":"ISOLATED_KRAFT","use_new_coordinator":true,"group_protocol":"consumer","group_remote_assignor":"range"}' > {code} > Then set the the {{TC_PATHS}} environment variable to include that test suite > file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests
[ https://issues.apache.org/jira/browse/KAFKA-16405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-16405: -- Assignee: Philip Nee > Mismatch assignment error when running consumer rolling upgrade system tests > > > Key: KAFKA-16405 > URL: https://issues.apache.org/jira/browse/KAFKA-16405 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: kip-848-client-support > > relevant to [https://github.com/apache/kafka/pull/15578] > > We are seeing: > {code:java} > > SESSION REPORT (ALL TESTS) > ducktape version: 0.11.4 > session_id: 2024-03-21--001 > run time: 3 minutes 24.632 seconds > tests run:7 > passed: 5 > flaky:0 > failed: 2 > ignored: 0 > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic > status: PASS > run time: 24.599 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 26.638 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})}") > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 77, in rolling_update_test > self._verify_range_assignment(consumer) > File > "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", > line 38, in _verify_range_assignment > assert assignment == set([ > AssertionError: Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})} > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False > status: PASS > run time: 29.815 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True > status: PASS > run time: 29.766 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic > status: PASS > run time: 30.086 seconds > > test_id: > kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 35.965 seconds > AssertionError("Mismatched assignment: {frozenset(), > frozenset({TopicPartition(topic='test_topic', partition=3), > TopicPartition(topic='test_topic', partition=0), > TopicPartition(topic='test_topic', partition=1), > TopicPartition(topic='test_topic', partition=2)})}") > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages
[jira] [Created] (KAFKA-16405) Mismatch assignment error when running consumer rolling upgrade system tests
Philip Nee created KAFKA-16405: -- Summary: Mismatch assignment error when running consumer rolling upgrade system tests Key: KAFKA-16405 URL: https://issues.apache.org/jira/browse/KAFKA-16405 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Philip Nee relevant to [https://github.com/apache/kafka/pull/15578] We are seeing: {code:java} SESSION REPORT (ALL TESTS) ducktape version: 0.11.4 session_id: 2024-03-21--001 run time: 3 minutes 24.632 seconds tests run:7 passed: 5 flaky:0 failed: 2 ignored: 0 test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 24.599 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=COMBINED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 26.638 seconds AssertionError("Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})}") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 77, in rolling_update_test self._verify_range_assignment(consumer) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 38, in _verify_range_assignment assert assignment == set([ AssertionError: Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})} test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=False status: PASS run time: 29.815 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True status: PASS run time: 29.766 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=classic status: PASS run time: 30.086 seconds test_id: kafkatest.tests.client.consumer_rolling_upgrade_test.ConsumerRollingUpgradeTest.rolling_update_test.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 35.965 seconds AssertionError("Mismatched assignment: {frozenset(), frozenset({TopicPartition(topic='test_topic', partition=3), TopicPartition(topic='test_topic', partition=0), TopicPartition(topic='test_topic', partition=1), TopicPartition(topic='test_topic', partition=2)})}") Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_rolling_upgrade_test.py", line 77, in rolling_update_test self._verify_range_assignment(consumer) File "/opt/kafka-dev/tests/kafkatest/tests/client/consumer_r
[jira] [Created] (KAFKA-16390) consumer_bench_test.py failed using AsyncKafkaConsumer
Philip Nee created KAFKA-16390: -- Summary: consumer_bench_test.py failed using AsyncKafkaConsumer Key: KAFKA-16390 URL: https://issues.apache.org/jira/browse/KAFKA-16390 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Philip Nee Ran the system test based on KAFKA-16273 The following tests failed using the consumer group protocol {code:java} kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer {code} Because of {code:java} TimeoutError('consume_workload failed to finish in the expected amount of time.') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", line 146, in test_single_partition consume_workload.wait_for_done(timeout_sec=180) File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 352, in wait_for_done wait_until(lambda: self.done(), File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: consume_workload failed to finish in the expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16116. Resolution: Fixed > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816239#comment-17816239 ] Philip Nee commented on KAFKA-16156: There seem to be a subtle difference in behavior between the async and the legacy consumer. The legacy one catches the error without doing anything, the async client doesn't seem to be handling the exception. The fix would be easy - but I'll run the same test first > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) > at > org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) > Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp > at > org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:39) > at > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253) > at > org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181) >
[jira] [Commented] (KAFKA-16156) System test failing for new consumer on endOffsets with negative timestamps
[ https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815820#comment-17815820 ] Philip Nee commented on KAFKA-16156: This issue seems easily reproducible by simply running endOffsets: This is an example of an integration test {code:java} def testEndOffsets(quorum: String, groupProtocol: String): Unit = { val producer = createProducer() (0 until 1).foreach { i => producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i" .getBytes)) } // This test ensure that the member ID is propagated from the group coordinator when the // assignment is received into a subsequent offset commit val consumer = createConsumer() assertEquals(0, consumer.assignment.size) consumer.subscribe(List(topic).asJava) awaitAssignment(consumer, Set(tp, tp2)) print("listing offsets") print(consumer.endOffsets(Set(tp, tp2).asJava)) } {code} > System test failing for new consumer on endOffsets with negative timestamps > --- > > Key: KAFKA-16156 > URL: https://issues.apache.org/jira/browse/KAFKA-16156 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, system tests >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid > negative timestamp". > Trace: > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Received ListOffsetResponse > ListOffsetsResponseData(throttleTimeMs=0, > topics=[ListOffsetsTopicResponse(name='input-topic', > partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, > oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from > broker worker2:9092 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,932] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Handling ListOffsetResponse > response for input-topic-0. Fetched offset 42804, timestamp -1 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,932] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Updating last stable offset for > partition input-topic-0 to 42804 > (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils) > [2024-01-15 07:42:33,933] DEBUG [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] Fetch offsets completed > successfully for partitions and timestamps {input-topic-0=-1}. Result > org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862 > (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager) > [2024-01-15 07:42:33,933] TRACE [Consumer > clientId=consumer-transactions-test-consumer-group-1, > groupId=transactions-test-consumer-group] No events to process > (org.apache.kafka.clients.consumer.internals.events.EventProcessor) > [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event > loop (org.apache.kafka.tools.TransactionalMessageCopier) > org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: > Invalid negative timestamp > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) > at > org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) > at > org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) > at > org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) > at > org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) > at > org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) > at > org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) >
[jira] [Resolved] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16115. Resolution: Fixed > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: consumer-threading-refactor, metrics > Fix For: 3.8.0 > > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16178) AsyncKafkaConsumer doesn't retry joining the group after rediscovering group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-16178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17808801#comment-17808801 ] Philip Nee commented on KAFKA-16178: Seems to be an issue with the RequestState - I wonder if we've forgotten to update the lastReceivedMs when receiving these errors. > AsyncKafkaConsumer doesn't retry joining the group after rediscovering group > coordinator > > > Key: KAFKA-16178 > URL: https://issues.apache.org/jira/browse/KAFKA-16178 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Dongnuo Lyu >Priority: Critical > Labels: consumer-threading-refactor > Attachments: pkc-devc63jwnj_jan19_0_debug > > > {code:java} > [2024-01-17 21:34:59,500] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] GroupHeartbeatRequest failed because the > group coordinator > Optional[b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483644 rack: null)] is incorrect. Will attempt to find the coordinator > again and retry in 0ms: This is not the correct coordinator. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:407) > [2024-01-17 21:34:59,681] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Group coordinator > b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483644 rack: > null) is unavailable or invalid due to cause: This is not the correct > coordinator.. Rediscovery will be attempted. > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:136) > [2024-01-17 21:34:59,882] INFO [Consumer > clientId=consumer.7e26597f-0285-4e13-88d6-31500a500275-0, > groupId=consumer-groups-test-0] Discovered group coordinator > Coordinator(key='consumer-groups-test-0', nodeId=3, > host='b3-pkc-devc63jwnj.us-west-2.aws.devel.cpdev.cloud', port=9092, > errorCode=0, errorMessage='') > (org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager:162){code} > Some of the consumers don't consume any message. The logs show that after the > consumer starts up and successfully logs in, > # The consumer discovers the group coordinator. > # The heartbeat to join group fails because "This is not the correct > coordinator" > # The consumer rediscover the group coordinator. > Another heartbeat should follow the rediscovery of the group coordinator, but > there's no logs showing sign of a heartbeat request. > On the server side, there is completely no log about the group id. A > suspicion is that the consumer doesn't send a heartbeat request after > rediscover the group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16113. Resolution: Fixed > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop
Philip Nee created KAFKA-16160: -- Summary: AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop Key: KAFKA-16160 URL: https://issues.apache.org/jira/browse/KAFKA-16160 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Observing some excessive logging running AsyncKafkaConsumer and observing excessive logging of : {code:java} 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, groupId=concurrent_consumer] Node is not ready, handle the request in the next event loop: node=worker4:9092 (id: 2147483644 rack: null), request=UnsentRequest{requestBuil der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], serverAssignor=null, topicP artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, node=Optional[worker4:9092 (id: 2147483644 rack: null)] , timer=org.apache.kafka.common.utils.Timer@55ed4733} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} This seems to be triggered by a tight poll loop of the network thread. The right thing to do is to backoff a bit for that given node and retry later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16160: --- Description: Observing some excessive logging running AsyncKafkaConsumer and observing excessive logging of : {code:java} 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, groupId=concurrent_consumer] Node is not ready, handle the request in the next event loop: node=worker4:9092 (id: 2147483644 rack: null), request=UnsentRequest{requestBuil der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], serverAssignor=null, topicP artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, node=Optional[worker4:9092 (id: 2147483644 rack: null)] , timer=org.apache.kafka.common.utils.Timer@55ed4733} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} This seems to be triggered by a tight poll loop of the network thread. The right thing to do is to backoff a bit for that given node and retry later. This should be a blocker for 3.8 release. was: Observing some excessive logging running AsyncKafkaConsumer and observing excessive logging of : {code:java} 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, groupId=concurrent_consumer] Node is not ready, handle the request in the next event loop: node=worker4:9092 (id: 2147483644 rack: null), request=UnsentRequest{requestBuil der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], serverAssignor=null, topicP artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, node=Optional[worker4:9092 (id: 2147483644 rack: null)] , timer=org.apache.kafka.common.utils.Timer@55ed4733} (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} This seems to be triggered by a tight poll loop of the network thread. The right thing to do is to backoff a bit for that given node and retry later. > AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. > This should be a blocker for 3.8 release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16160: --- Labels: consumer-threading-refactor (was: ) > AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16160) AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop
[ https://issues.apache.org/jira/browse/KAFKA-16160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16160: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Task) > AsyncKafkaConsumer is trying to connect to a disconnected note in a tight loop > -- > > Key: KAFKA-16160 > URL: https://issues.apache.org/jira/browse/KAFKA-16160 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Priority: Major > > Observing some excessive logging running AsyncKafkaConsumer and observing > excessive logging of : > {code:java} > 1271 [2024-01-15 09:43:36,627] DEBUG [Consumer clientId=console-consumer, > groupId=concurrent_consumer] Node is not ready, handle the request in the > next event loop: node=worker4:9092 (id: 2147483644 rack: null), > request=UnsentRequest{requestBuil > der=ConsumerGroupHeartbeatRequestData(groupId='concurrent_consumer', > memberId='laIqS789StuhXFpTwjh6hA', memberEpoch=1, instanceId=null, > rackId=null, rebalanceTimeoutMs=30, subscribedTopicNames=[output-topic], > serverAssignor=null, topicP > artitions=[TopicPartitions(topicId=I5P5lIXvR1Cjc8hfoJg5bg, partitions=[0])]), > handler=org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler@918925b, > node=Optional[worker4:9092 (id: 2147483644 rack: null)] , > timer=org.apache.kafka.common.utils.Timer@55ed4733} > (org.apache.kafka.clients.consumer.internals.NetworkClientDelegate) {code} > This seems to be triggered by a tight poll loop of the network thread. The > right thing to do is to backoff a bit for that given node and retry later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Description: While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? Update --- Looking from the beginning, is this caused by the following? {code:java} DEBUG The broker generated an error for the get telemetry network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) 146 org.apache.kafka.common.errors.UnsupportedVersionException: The node does not support GET_TELEMETRY_SUBSCRIPTIONS {code} was: While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: consumer, logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? > > Update > --- > Looking from the beginning, is this caused by the following? > {code:java} > DEBUG The broker generated an error for the get telemetry network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) > 146 org.apache.kafka.common.errors.UnsupportedVersionException: The node > does not support GET_TELEMETRY_SUBSCRIPTIONS {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Fix Version/s: 3.8.0 > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Labels: consumer logging (was: logging) > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: consumer, logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
[ https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16159: --- Labels: logging (was: ) > Prune excessive logging from Telemetry Reporter > --- > > Key: KAFKA-16159 > URL: https://issues.apache.org/jira/browse/KAFKA-16159 > Project: Kafka > Issue Type: Task > Components: consumer, log >Reporter: Philip Nee >Assignee: Apoorv Mittal >Priority: Major > Labels: logging > Fix For: 3.8.0 > > > While running system tests locally, I've noticed excessive logging of the > Telemtry Reporter. This I believe was introduced in KIP-714. > {code:java} > [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, > returning the value 224678 ms; the client will wait before submitting the > next GetTelemetrySubscriptions network API request > (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} > This is logged several times per ms - Also, given the amount of log being > emitted, can we also check the CPU profile to see if there's a process > running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16159) Prune excessive logging from Telemetry Reporter
Philip Nee created KAFKA-16159: -- Summary: Prune excessive logging from Telemetry Reporter Key: KAFKA-16159 URL: https://issues.apache.org/jira/browse/KAFKA-16159 Project: Kafka Issue Type: Task Components: consumer, log Reporter: Philip Nee Assignee: Apoorv Mittal While running system tests locally, I've noticed excessive logging of the Telemtry Reporter. This I believe was introduced in KIP-714. {code:java} [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, returning the value 224678 ms; the client will wait before submitting the next GetTelemetrySubscriptions network API request (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code} This is logged several times per ms - Also, given the amount of log being emitted, can we also check the CPU profile to see if there's a process running a tight loop? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16112) Review JMX metrics in Async Consumer and determine the missing ones
[ https://issues.apache.org/jira/browse/KAFKA-16112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-16112. Resolution: Fixed These are the results of this ticket |KAFKA-16113| |KAFKA-16116| |KAFKA-16115| > Review JMX metrics in Async Consumer and determine the missing ones > --- > > Key: KAFKA-16112 > URL: https://issues.apache.org/jira/browse/KAFKA-16112 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Labels: consumer-threading-refactor (was: ) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
[ https://issues.apache.org/jira/browse/KAFKA-16116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16116: --- Component/s: consumer metrics > AsyncKafkaConsumer: Add missing rebalance metrics > - > > Key: KAFKA-16116 > URL: https://issues.apache.org/jira/browse/KAFKA-16116 > Project: Kafka > Issue Type: Improvement > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| > |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| > |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| > |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| > |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| > |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| > |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16116) AsyncKafkaConsumer: Add missing rebalance metrics
Philip Nee created KAFKA-16116: -- Summary: AsyncKafkaConsumer: Add missing rebalance metrics Key: KAFKA-16116 URL: https://issues.apache.org/jira/browse/KAFKA-16116 Project: Kafka Issue Type: Improvement Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[rebalance-latency-avg|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-avg]| |[rebalance-latency-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-max]| |[rebalance-latency-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-latency-total]| |[rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-rate-per-hour]| |[rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#rebalance-total]| |[failed-rebalance-rate-per-hour|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-rate-per-hour]| |[failed-rebalance-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#failed-rebalance-total]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
Philip Nee created KAFKA-16115: -- Summary: AsyncKafkaConsumer: Add missing heartbeat metrics Key: KAFKA-16115 URL: https://issues.apache.org/jira/browse/KAFKA-16115 Project: Kafka Issue Type: Improvement Components: consumer, metrics Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing: |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16115) AsyncKafkaConsumer: Add missing heartbeat metrics
[ https://issues.apache.org/jira/browse/KAFKA-16115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16115: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing heartbeat metrics > - > > Key: KAFKA-16115 > URL: https://issues.apache.org/jira/browse/KAFKA-16115 > Project: Kafka > Issue Type: Sub-task > Components: consumer, metrics >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing: > |[heartbeat-rate|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-rate]| > |[heartbeat-response-time-max|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-response-time-max]| > |[heartbeat-total|https://docs.confluent.io/platform/current/kafka/monitoring.html#heartbeat-total]| > |[last-heartbeat-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-heartbeat-seconds-ago]| > |[last-rebalance-seconds-ago|https://docs.confluent.io/platform/current/kafka/monitoring.html#last-rebalance-seconds-ago]| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
[ https://issues.apache.org/jira/browse/KAFKA-16113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16113: --- Parent: KAFKA-14246 Issue Type: Sub-task (was: Improvement) > AsyncKafkaConsumer: Add missing offset commit metrics > - > > Key: KAFKA-16113 > URL: https://issues.apache.org/jira/browse/KAFKA-16113 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > > The following metrics are missing from the AsyncKafkaConsumer: > commit-latency-avg > commit-latency-max > commit-rate > commit-total > committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16113) AsyncKafkaConsumer: Add missing offset commit metrics
Philip Nee created KAFKA-16113: -- Summary: AsyncKafkaConsumer: Add missing offset commit metrics Key: KAFKA-16113 URL: https://issues.apache.org/jira/browse/KAFKA-16113 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Philip Nee Assignee: Philip Nee The following metrics are missing from the AsyncKafkaConsumer: commit-latency-avg commit-latency-max commit-rate commit-total committed-time-ns-total -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805272#comment-17805272 ] Philip Nee commented on KAFKA-16110: My proposal here is - Let's run trogdor to see what can we get out of it. If the current settings is not satisfied then we can add more "specs" to the repo and see if we can get to the point we want. - We also might want to monitor the performance of the head of trunk as we are putting code in. I wonder if it is easy to achieve using trogdor. > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16110) Implement consumer performance tests
[ https://issues.apache.org/jira/browse/KAFKA-16110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805271#comment-17805271 ] Philip Nee commented on KAFKA-16110: Hi [~kirktrue] - Thanks for filing this JIRA. There are two paths forward for the performance testing. One is using trogdor - which does more than performance testing but also allow us to test different fault scenarios. Second is implementing our own benchmarking test, I started working on it some time ago but I left it in a limbo state given trogdor does a lot of it already. WDTY? FWIW: this is the repo: https://github.com/philipnee/kafka-benchmarker > Implement consumer performance tests > > > Key: KAFKA-16110 > URL: https://issues.apache.org/jira/browse/KAFKA-16110 > Project: Kafka > Issue Type: New Feature > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15250. Fix Version/s: (was: 3.8.0) Resolution: Fixed > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15250) DefaultBackgroundThread is running tight loop
[ https://issues.apache.org/jira/browse/KAFKA-15250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804576#comment-17804576 ] Philip Nee commented on KAFKA-15250: i believe this is resolved. > DefaultBackgroundThread is running tight loop > - > > Key: KAFKA-15250 > URL: https://issues.apache.org/jira/browse/KAFKA-15250 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > The DefaultBackgroundThread is running tight loops and wasting CPU cycles. I > think we need to reexamine the timeout pass to networkclientDelegate.poll. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15872) Investigate autocommit retry logic
[ https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804574#comment-17804574 ] Philip Nee commented on KAFKA-15872: related https://issues.apache.org/jira/browse/KAFKA-15475 > Investigate autocommit retry logic > -- > > Key: KAFKA-15872 > URL: https://issues.apache.org/jira/browse/KAFKA-15872 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This is purely an investigation ticket. > Currently, we send an autocommit only if there isn't an inflight one; > however, this logic might not be correct because I think we should: > # expires the request if it is not completed in time > # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-15872) Investigate autocommit retry logic
[ https://issues.apache.org/jira/browse/KAFKA-15872 ] Philip Nee deleted comment on KAFKA-15872: was (Author: JIRAUSER283568): this can be closed as duplicate of KAFKA-15946 > Investigate autocommit retry logic > -- > > Key: KAFKA-15872 > URL: https://issues.apache.org/jira/browse/KAFKA-15872 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This is purely an investigation ticket. > Currently, we send an autocommit only if there isn't an inflight one; > however, this logic might not be correct because I think we should: > # expires the request if it is not completed in time > # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15872) Investigate autocommit retry logic
[ https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17804573#comment-17804573 ] Philip Nee commented on KAFKA-15872: this can be closed as duplicate of KAFKA-15946 > Investigate autocommit retry logic > -- > > Key: KAFKA-15872 > URL: https://issues.apache.org/jira/browse/KAFKA-15872 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > This is purely an investigation ticket. > Currently, we send an autocommit only if there isn't an inflight one; > however, this logic might not be correct because I think we should: > # expires the request if it is not completed in time > # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16034) AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on fenced/unknown member Id
[ https://issues.apache.org/jira/browse/KAFKA-16034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801921#comment-17801921 ] Philip Nee commented on KAFKA-16034: Hi [~phuctran] - I think the issue might already been fixed. Please try to reproduce it using PlainTextConsumerTest. > AsyncKafkaConsumer will get Invalid Request error when trying to rejoin on > fenced/unknown member Id > --- > > Key: KAFKA-16034 > URL: https://issues.apache.org/jira/browse/KAFKA-16034 > Project: Kafka > Issue Type: Sub-task >Reporter: Philip Nee >Assignee: Phuc Hong Tran >Priority: Major > Fix For: 3.8.0 > > > The consumer will log invalid request error when joining from fenced/unknown > member id because we didn't reset the HeartbeatState and we won't send the > needed fields (rebalanceTimeoutMs for example) when joining. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15948) Refactor AsyncKafkaConsumer shutdown
[ https://issues.apache.org/jira/browse/KAFKA-15948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15948. Assignee: Philip Nee (was: Phuc Hong Tran) Resolution: Fixed > Refactor AsyncKafkaConsumer shutdown > > > Key: KAFKA-15948 > URL: https://issues.apache.org/jira/browse/KAFKA-15948 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.8.0 > > > Upon closing we need a round trip from the network thread to the application > thread and then back to the network thread to complete the callback > invocation. Currently, we don't have any of that. I think we need to > refactor our closing mechanism. There are a few points to the refactor: > # The network thread should know if there's a custom user callback to > trigger or not. If there is, it should wait for the callback completion to > send a leave group. If not, it should proceed with the shutdown. > # The application thread sends a closing signal to the network thread and > continuously polls the background event handler until time runs out. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15818) Implement max poll interval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800506#comment-17800506 ] Philip Nee commented on KAFKA-15818: thank u stan. it was merged. > Implement max poll interval > --- > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > The consumer needs to be polled at a candance lower than > MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. > Currently, we send an acknowledgment event to the network thread per poll. > The event only triggers update on autocommit state, we need to implement > updating the poll timer so that the consumer can leave the group when the > timer expires. > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15818) Implement max poll interval
[ https://issues.apache.org/jira/browse/KAFKA-15818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799043#comment-17799043 ] Philip Nee commented on KAFKA-15818: hi stan - [KAFKA-16026|https://github.com/apache/kafka/pull/15035] is also part of it. will close the issue after 16026 is merged. > Implement max poll interval > --- > > Key: KAFKA-15818 > URL: https://issues.apache.org/jira/browse/KAFKA-15818 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor, kip-848-client-support, > kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > The consumer needs to be polled at a candance lower than > MAX_POLL_INTERVAL_MAX otherwise the consumer should try to leave the group. > Currently, we send an acknowledgment event to the network thread per poll. > The event only triggers update on autocommit state, we need to implement > updating the poll timer so that the consumer can leave the group when the > timer expires. > > The current logic looks like this: > {code:java} > if (heartbeat.pollTimeoutExpired(now)) { > // the poll timeout has expired, which means that the foreground thread > has stalled > // in between calls to poll(). > log.warn("consumer poll timeout has expired. This means the time between > subsequent calls to poll() " + > "was longer than the configured max.poll.interval.ms, which typically > implies that " + > "the poll loop is spending too much time processing messages. You can > address this " + > "either by increasing max.poll.interval.ms or by reducing the maximum > size of batches " + > "returned in poll() with max.poll.records."); > maybeLeaveGroup("consumer poll timeout has expired."); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)