[
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Damian Guy updated KAFKA-4311:
------------------------------
Description:
The two exceptions below were reported by Frank on the dev mailing list. After
investigation, the root cause is multiple cache evictions happening in the same
topology.
{code}
final KTable<String, String> one = builder.table(Serdes.String(),
Serdes.String(), tableOne, tableOne);
final KTable<Long, String> two = builder.table(Serdes.Long(),
Serdes.String(), tableTwo, tableTwo);
final KTable<String, Long> reduce = two.groupBy(new
KeyValueMapper<Long, String, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(final Long key, final String
value) {
return new KeyValue<>(value, key);
}
}, Serdes.String(), Serdes.Long())
.reduce(new Reducer<Long>() {
@Override
public Long apply(final Long value1, final Long value2) {
return value1 + value2;
}
}, new Reducer<Long>() {
@Override
public Long apply(final Long value1, final Long value2) {
return value1 - value2;
}
}, "reducer-store");
one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
@Override
public String apply(final String value1, final Long value2) {
return value1 + ":" + value2;
}
})
.mapValues(new ValueMapper<String, String>() {
@Override
public String apply(final String value) {
return value;
}
});
{code}
Reported on the mailing list. Needs looking into how it could get in this state.
[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
... 7 more
was:
Reported on the mailing list. Needs looking into how it could get in this state.
[StreamThread-1] ERROR
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[StreamThread-1] Failed to close state manager for StreamTask 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
to close state store addr-organization
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
at
org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
at
org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
at
org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
entry is null
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
at
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
... 7 more
> Multi layer cache eviction causes forwarding to incorrect Processor Node
> -------------------------------------------------------------------------
>
> Key: KAFKA-4311
> URL: https://issues.apache.org/jira/browse/KAFKA-4311
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.0
> Reporter: Damian Guy
> Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list.
> After investigation, the root cause is multiple cache evictions happening in
> the same topology.
> {code}
> final KTable<String, String> one = builder.table(Serdes.String(),
> Serdes.String(), tableOne, tableOne);
> final KTable<Long, String> two = builder.table(Serdes.Long(),
> Serdes.String(), tableTwo, tableTwo);
> final KTable<String, Long> reduce = two.groupBy(new
> KeyValueMapper<Long, String, KeyValue<String, Long>>() {
> @Override
> public KeyValue<String, Long> apply(final Long key, final String
> value) {
> return new KeyValue<>(value, key);
> }
> }, Serdes.String(), Serdes.Long())
> .reduce(new Reducer<Long>() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 + value2;
> }
> }, new Reducer<Long>() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 - value2;
> }
> }, "reducer-store");
> one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
> @Override
> public String apply(final String value1, final Long value2) {
> return value1 + ":" + value2;
> }
> })
> .mapValues(new ValueMapper<String, String>() {
> @Override
> public String apply(final String value) {
> return value;
> }
> });
> {code}
> Reported on the mailing list. Needs looking into how it could get in this
> state.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)