John Roesler created KAFKA-10247:
------------------------------------
Summary: Streams may attempt to process after closing a task
Key: KAFKA-10247
URL: https://issues.apache.org/jira/browse/KAFKA-10247
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.6.0
Reporter: John Roesler
Assignee: John Roesler
Observed in a system test. A corrupted task was detected, and Stream properly
closed it as dirty:
{code:java}
[2020-07-08 17:08:09,345] WARN stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered
org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records
from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it
is likely that the consumer's position has fallen out of the topic partition
offset range because the topic was truncated or compacted on the broker,
marking the corresponding tasks as corrupted and re-initializing it later.
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position
FetchPosition{offset=1, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)],
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,345] WARN stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the
states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will
close the task as dirty and re-create and bootstrap from scratch.
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs
{2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be
re-initialized
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch
position FetchPosition{offset=1, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)],
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
... 3 more
[2020-07-08 17:08:09,346] INFO stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
[2020-07-08 17:08:09,346] DEBUG stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
Closing its state manager and all the registered state stores:
{sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 :
SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null,
cntStoreName=StateStoreMetadata (cntStoreName :
SmokeTest-cntStoreName-changelog-1 @ 0}
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
[2020-07-08 17:08:09,346] INFO [Consumer
clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer,
groupId=null] Subscribed to partition(s): SmokeTest-minStoreName-changelog-1,
SmokeTest-minStoreName-changelog-2,
SmokeTest-sum-STATE-STORE-0000000050-changelog-0,
SmokeTest-minStoreName-changelog-3,
SmokeTest-sum-STATE-STORE-0000000050-changelog-2,
SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0,
SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2,
SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4
(org.apache.kafka.clients.consumer.KafkaConsumer)
[2020-07-08 17:08:09,348] DEBUG stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released state
dir lock for task 2_1
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2020-07-08 17:08:09,348] INFO stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
Closing record collector dirty
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
[2020-07-08 17:08:09,348] INFO stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1]
Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code}
However, there were already records buffered for it, so later on in the same
processing loop, Streams tried to process that task, resulting in an
IllegalStateException:
{code:java}
[2020-07-08 17:08:09,352] ERROR stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to
process stream task 2_1 due to the following error:
(org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName
is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
at
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
at
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
at
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,352] ERROR stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered the
following exception during processing and the thread is going to shut down:
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName
is currently closed.
at
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
at
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
at
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
at
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,352] INFO stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State
transition from RUNNING to PENDING_SHUTDOWN
(org.apache.kafka.streams.processor.internals.StreamThread)
[2020-07-08 17:08:09,352] INFO stream-thread
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down
(org.apache.kafka.streams.processor.internals.StreamThread){code}
Which caused the entire thread to shut down.
Instead, we should not attempt to process tasks that are not running.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)