[
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-18355.
-------------------------------------
Resolution: Fixed
> Stream thread blocks indefinitely for acquiring state directory lock
> --------------------------------------------------------------------
>
> Key: KAFKA-18355
> URL: https://issues.apache.org/jira/browse/KAFKA-18355
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.8.1
> Reporter: Ravi Gupta
> Priority: Major
>
> We are running Kafka streams based application in production and have noticed
> couple of timesĀ {*}lag on source topic partition start increasing{*}.
> Based on investigation, we found the below happening:
> * Thread responsible for the partition task gets Authentication exception (
> MSK IAM authentication gives the transient exception) while producing record
> in the Sink
> {code:java}
> {
> "level":"ERROR",
> "logger_name":"org.apache.kafka.clients.NetworkClient",
> "message":"[Producer
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
>
> transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
> Connection to node 1
> (b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy)
> failed authentication due to: An error:
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException:
> Failed to find AWS IAM Credentials [Caused by
> com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status
> Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when
> evaluating SASL token received from the Kafka Broker. Kafka Client will go to
> AUTHENTICATION_FAILED state.",
> "thread_name":"kafka-producer-network-thread |
> xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
> "time":"2024-12-26T07:40:45.113067247Z"
> } {code}
> * In some cases, the system recovers when the next record is polled and the
> Sink Node ( RecordCollectorImpl) throws the exception from the last message
> while processing
> * However, in couple of cases the following logs appears, approximately 5
> minutes after the producer failure. ( {_}N{_}{_}o additional log statement to
> understand why thread stopped polling, however it seems heartbeat thread got
> the same exception as producer){_}.
> {code:java}
> {
> "level":"WARN",
> "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
> "message":"[Consumer
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
> groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has
> expired. This means the time between subsequent calls to poll() was longer
> than the configured max.poll.interval.ms, which typically implies that the
> poll loop is spending too much time processing messages. You can address this
> either by increasing max.poll.interval.ms or by reducing the maximum size of
> batches returned in poll() with max.poll.records.",
> "thread_name":"kafka-coordinator-heartbeat-thread |
> xxxxx-xxxx-lall-lio-step-executor_lio-se",
> "time":"2024-12-26T07:45:43.286428901Z"
> } {code}
> * In such cases, the partition gets assigned to a new thread ( Thread 5),
> however the new thread keep throwing the following exception:
> {code:java}
> {
> "level":"INFO",
> "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
> "message":"stream-thread
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
> Encountered lock exception. Reattempting locking the state in the next
> iteration.",
> "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
> task [8_0] Failed to lock the state directory for task 8_0\n\tat
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
>
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
> "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
> "time":"2024-12-26T07:50:53.904374419Z"
> } {code}
> * We are using exception handler, however, in these failure cases our
> exception handler is not called for both producer and consumer exception.
> However in some authentication exception during consume/produce we see the
> handler being called.
> It seems that old thread didn't clean up its state: as the producer failures
> are cleaned up when processing next event ( which never happened due to
> consumer exception). Neither did consumer failure tried to release the lock.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)