Di Campo created KAFKA-8612:
-------------------------------
Summary: Broker removes consumers from CG, Streams app gets stuck
Key: KAFKA-8612
URL: https://issues.apache.org/jira/browse/KAFKA-8612
Project: Kafka
Issue Type: Bug
Components: clients, streams
Affects Versions: 2.1.1
Reporter: Di Campo
Attachments: full-thread-dump-kafka-streams-stuck.log
Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances.
Kafka Streams application (`stream-processor`) cluster of 3 instances, 2
threads each. `2.1.0`
Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse
19.5.3.8`), with several tables consuming from a different topic each.
The `stream-processor` is running consuming from a source topic and running a
topology of 26 topics (64 partitions each) with 5 state stores, 1 of them
sessioned, 4 key-value.
Infra running on docker on AWS ECS.
Consuming at a rate of 300-1000 events per second. Each event generates an avg
of ~20 extra messages.
Timestamps are kept for better analysis.
`stream-processor` tasks at some point fail to produce to any partition due to
timeouts:
{noformat}
[2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to topic
(...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44
record(s) for (...)-48:120002 ms has passed since batch creation; No more
records will be sent and no more offsets will be recorded for this task.
{noformat}
and "Offset commit failed" errors, in all partitions:
{noformat}
[2019-06-28 10:04:27,705] ERROR [Consumer
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer,
groupId=stream-processor-0.0.1] Offset commit failed on partition
events-raw-63 at offset 4858803: The request timed out.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
{noformat}
_At this point we begin seeing error messages in one of the brokers (see below,
Broker logs section)._
More error messages are shown on the `stream-processor`:
{noformat}
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired
before successfully committing offsets
{(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}}
{noformat}
then hundreds of messages of the following types (one per topic-partitio)
intertwinned:
{noformat}
[2019-06-28 10:05:23,608] WARN [Producer
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
Got error produce response with correlation id 39946 on topic-partition
(topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}
{noformat}
[2019-06-28 10:05:23,609] WARN [Producer
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
Received invalid metadata error in produce request on partition (topic)1-59
due to org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.. Going to request metadata update now
(org.apache.kafka.clients.producer.internals.Sender)
{noformat}
And then:
{noformat}
[2019-06-28 10:05:47,986] ERROR stream-thread
[stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4]
Failed to commit stream task 1_57 due to the following error:
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task
[1_57] Abort sending since an error caught with a previous record (...) to
topic (...) due to org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
2019-06-28 10:05:47You can increase producer parameter `retries` and
`retry.backoff.ms` to avoid this error.
2019-06-28 10:05:47 at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
{noformat}
...and eventually we get to the error messages:
{noformat}
[2019-06-28 10:05:51,198] ERROR [Producer
clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer]
Uncaught error in kafka producer I/O thread:
(org.apache.kafka.clients.producer.internals.Sender)
2019-06-28 10:05:51java.util.ConcurrentModificationException
2019-06-28 10:05:51 at
java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
{noformat}
{noformat}
[2019-06-28 10:07:18,735] ERROR task [1_63] Failed to flush state store
orderStore:
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
org.apache.kafka.streams.errors.StreamsException: task [1_63] Abort sending
since an error caught with a previous record (...) timestamp 1561664080389) to
topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring 44
record(s) for pageview-sessions-0.0.1-63:120007 ms has passed since batch
creation
{noformat}
...and eventually after seeing many messages like the above, the KafkaStreams
is closed and the task dies, you can see when it finally dies in our piece of
logging:
{noformat}
[2019-06-28 10:08:23,334] ERROR Streams sent to close.
{noformat}
----------------
*One* (not all) of the *brokers* show several messages like this:
{code:java}
[2019-06-28 10:04:42,192] WARN Attempting to send response via channel for
which there is no open connection, connection id
172.17.0.3:9092-(client-IP):47760-24314 (kafka.network.Processor)
...
[2019-06-28 10:07:38,128] WARN Attempting to send response via channel for
which there is no open connection, connection id
172.17.0.3:9092-(client-IP):49038-24810 (kafka.network.Processor)
{code}
and several messages like this, also from the same broker:
{noformat}
2019-06-28 10:06:51,235] INFO [GroupCoordinator 3]: Member
stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4-consumer-f0c7d7b0-7f3b-465b-bf68-e55df2d783ed
in group stream-processor-0.0.1 has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)
{noformat}
In other points in time, there are also Membership errors found for the
ClickHouse consumer group, same task:
{noformat}
9-06-28 10:10:31,243] INFO [GroupCoordinator 3]: Member ClickHouse
19.5.3.8-c095f8ec-efc8-4b3a-93c5-6cd2aa9ee0ef in group (chgroup2) has failed,
removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-06-28 12:10:30[2019-06-28 10:10:30,752] INFO [GroupCoordinator 3]: Member
stream-processor-0.0.1-af495b81-7572-4dc0-a5c2-952916d8e41d-StreamThread-2-consumer-f6166b97-ef38-4d21-9f39-c47bf13d794b
in group stream-processor-0.0.1 has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)c688458e-35c9-41dc-9d85-5a559cf886fe
[2019-06-28 10:10:30,752] INFO [GroupCoordinator 3]: Member
stream-processor-0.0.1-af495b81-7572-4dc0-a5c2-952916d8e41d-StreamThread-2-consumer-f6166b97-ef38-4d21-9f39-c47bf13d794b
in group stream-processor-0.0.1 has failed, removing it from the group
(kafka.coordinator.group.GroupCoordinator)
[2019-06-28 10:10:29,495] INFO [GroupCoordinator 3]: Member ClickHouse
19.5.3.8-127af48f-b50c-4af1-a3a3-4ebd0ebeeeab in group (chgroup1) has failed,
removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2019-06-28 10:10:29,495] INFO [GroupCoordinator 3]: Member ClickHouse
19.5.3.8-8f98f7b7-eb41-41c0-a37a-f42e54218a47 in group (chgroup1) has failed,
removing it from the group (kafka.coordinator.group.GroupCoordinator)
{noformat}
----------------
After this, stream-processor tasks are restarted as they die.
And then, they start and remain idle and do nothing. They are stuck, but don't
show anything in the logs.
Consumer groups (via kafka-consumer-groups tool) showed lag, but tasks are not
consuming, but not dying either - they remain idle indefinitely.
After a full restart of the whole service (not rolling: 3 instances, to 0, then
3 again) tasks can join the consumer groups and then start processing as normal.
And some hours later, the whole case begins again and it got finally stuck.
The stack trace via kill -QUIT when the instances are stuck, shows threads like
this:
{noformat}
"stream-processor-0.0.1-2b95a3b1-fd4d-4ef5-ad31-8914145e7b7f-StreamThread-4"
#33 prio=5 os_prio=0 tid=0x00007fa761a25000 nid=0x2b runnable
[0x00007fa707a5e000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006f1113120> (a sun.nio.ch.Util$3)
- locked <0x00000006f1113110> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006f1112f68> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:752)
at org.apache.kafka.common.network.Selector.poll(Selector.java:451)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
{noformat}
(see rest of trace attached).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)