[
https://issues.apache.org/jira/browse/KAFKA-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877598#comment-15877598
]
Armin Braun commented on KAFKA-4569:
------------------------------------
Just for future reference. Tried to debug a few issues around this that I found
and figured out that this is coming from the heartbeat thread.
Basically it's getting enabled without any delay in
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`
and then in some cases polls quickly enough to make
`org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce` break out here:
{code}
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records =
fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
{code}
Not quite sure how to fix this, but you can reproduce it by running it in an
endless loop (like 2ms per run and dies after 500 or so on average :)) -> red
... goes green once you outcomment
this enable section
{code}
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
{code}
in
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`
> Transient failure in
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
> ---------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-4569
> URL: https://issues.apache.org/jira/browse/KAFKA-4569
> Project: Kafka
> Issue Type: Sub-task
> Components: unit tests
> Reporter: Guozhang Wang
> Assignee: Umesh Chaudhary
> Labels: newbie
> Fix For: 0.10.3.0
>
>
> One example is:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/
> {code}
> Stacktrace
> java.lang.AssertionError
> at org.junit.Assert.fail(Assert.java:86)
> at org.junit.Assert.fail(Assert.java:95)
> at
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
> at
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
> at
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
> at
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
> at
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
> at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
> at
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
> at
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:377)
> at
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
> at
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)