[
https://issues.apache.org/jira/browse/KAFKA-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15828044#comment-15828044
]
ASF GitHub Bot commented on KAFKA-4615:
---------------------------------------
GitHub user Mogztter opened a pull request:
https://github.com/apache/kafka/pull/2395
KAFKA-4615: Use a timeout when polling the request in AdminClient
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/Mogztter/kafka patch-1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2395.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2395
----
commit 628429261fc6ece1b78590c9d315037f72fb65ee
Author: Guillaume Grossetie <[email protected]>
Date: 2017-01-18T13:31:22Z
KAFKA-4615: Use a timeout when polling the request in AdminClient
----
> AdminClient.send function poll without timeout
> ----------------------------------------------
>
> Key: KAFKA-4615
> URL: https://issues.apache.org/jira/browse/KAFKA-4615
> Project: Kafka
> Issue Type: Bug
> Components: admin
> Affects Versions: 0.10.1.1
> Environment: Red Hat Enterprise Linux Server release 7.1
> Reporter: Guillaume Grossetie
>
> I'm using the AdminClient to fetch the consumer offsets of my topics.
> When the Kafka cluster is unavailable, I can see the following messages:
> {code}
> 2017-01-11 05:36:45,432 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64444 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,569 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64445 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,673 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64446 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,776 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64447 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,880 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64448 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:45,984 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64449 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:46,091 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64450 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> 2017-01-11 05:36:46,198 WARN [kafka-producer-network-thread | producer-1]
> o.apache.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 64451 : {metrics_app_ida_ido=UNKNOWN_TOPIC_OR_PARTITION}
> // ...
> {code}
> The problem is that the request never ends.
> 4 hours later, my process is stuck and the request is pending.
> Here's a jstack on my Java process :
> {code}
> $ jstack -l 19841
> 2017-01-11 10:32:58
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):
> "Attach Listener" #12 daemon prio=9 os_prio=0 tid=0x00007f7c5c001000
> nid=0x6c0b waiting on condition [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "DestroyJavaVM" #11 prio=5 os_prio=0 tid=0x00007f7c90008800 nid=0x4d88
> waiting on condition [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "pool-1-thread-1" #10 prio=5 os_prio=0 tid=0x00007f7c90468800 nid=0x4d96
> runnable [0x00007f7c78adc000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000000c52cbc40> (a sun.nio.ch.Util$2)
> - locked <0x00000000c52cbc30> (a
> java.util.Collections$UnmodifiableSet)
> - locked <0x00000000c52caf88> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
> at
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
> at
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
> at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
> at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
> at
> kafka.admin.AdminClient.listAllConsumerGroups(AdminClient.scala:111)
> at
> org.indusbox.idatha.kafka.lag.provider.KafkaLagConsumerOffsetsTopicProvider.fetch(KafkaLagConsumerOffsetsTopicProvider.java:65)
> at
> org.indusbox.idatha.kafka.lag.KafkaLagPollingReporter.run(KafkaLagPollingReporter.java:49)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Locked ownable synchronizers:
> - <0x00000000c51c7090> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "kafka-producer-network-thread | producer-1" #9 daemon prio=5 os_prio=0
> tid=0x00007f7c9032f800 nid=0x4d95 runnable [0x00007f7c78dde000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x00000000c51a2a28> (a sun.nio.ch.Util$2)
> - locked <0x00000000c51a2a18> (a
> java.util.Collections$UnmodifiableSet)
> - locked <0x00000000c51a1bd0> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
> at java.lang.Thread.run(Thread.java:745)
> Locked ownable synchronizers:
> - None
> "Service Thread" #7 daemon prio=9 os_prio=0 tid=0x00007f7c90140000 nid=0x4d91
> runnable [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "C1 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f7c9013d800
> nid=0x4d90 waiting on condition [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f7c9013a800
> nid=0x4d8f waiting on condition [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f7c90139000
> nid=0x4d8e runnable [0x0000000000000000]
> java.lang.Thread.State: RUNNABLE
> Locked ownable synchronizers:
> - None
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f7c90105800 nid=0x4d8d in
> Object.wait() [0x00007f7c7a24e000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000000c52c5168> (a
> java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> - locked <0x00000000c52c5168> (a java.lang.ref.ReferenceQueue$Lock)
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> Locked ownable synchronizers:
> - None
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f7c90101000
> nid=0x4d8c in Object.wait() [0x00007f7c801f8000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000000c52c5198> (a java.lang.ref.Reference$Lock)
> at java.lang.Object.wait(Object.java:502)
> at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
> - locked <0x00000000c52c5198> (a java.lang.ref.Reference$Lock)
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
> Locked ownable synchronizers:
> - None
> "VM Thread" os_prio=0 tid=0x00007f7c900f9800 nid=0x4d8b runnable
> "GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f7c9001d800 nid=0x4d89
> runnable
> "GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f7c9001f800 nid=0x4d8a
> runnable
> "VM Periodic Task Thread" os_prio=0 tid=0x00007f7c90142800 nid=0x4d92 waiting
> on condition
> JNI global references: 287
> {code}
> As you can my thread is still running
> {{org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)}}
> I think we should use the poll method with a timeout and give up after a
> short period of time :
> {code}public boolean poll(RequestFuture<?> future, long timeout){code}
> Ref:
> https://github.com/apache/kafka/blob/75469a3b602c26ea81d6fc0a409d39d321195ea4/core/src/main/scala/kafka/admin/AdminClient.scala#L46
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)