[
https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836065#comment-16836065
]
tdp commented on KAFKA-8339:
Hi Matthias,
Thanks for the quick reply! After digging into this a bit more, I think the
issue is in our aggregation logic. We aggregate records and put the
sub-aggregation result into the local state store. When the aggregation is
"full" and the non-null value is returned by KSTREAM-TRANSFORMVALUES-NODE1, the
transform node logic also deletes the sub-aggregation from the local state
store. This is the flaw in our logic, since only the last record from topic T1
is consumed by the new stream thread and it does not see the sub-aggregation in
the local state store since it was previously deleted. I was incorrectly
assuming that the earlier records should have been re-consumed. The logs below
explain in more detail.
I also should have specified that we are using a customized exception handler
that looks like this:
{code:java}
public class CustomLogAndFailExceptionHandler implements
ProductionExceptionHandler, DeserializationExceptionHandler {
// ...
@Override public ProductionExceptionHandlerResponse
handle(ProducerRecord record, Exception exception) {
// ...
return ProductionExceptionHandlerResponse.FAIL;
}
@Override public DeserializationHandlerResponse handle(ProcessorContext
context, ConsumerRecord record, Exception exception) {
// ...
return DeserializationHandlerResponse.FAIL;
}
}
{code}
For clarity, here's some selected logs (with additional custom debug/info logs
added):
Record 1 (of 3) aggregated and sub-aggregation written to local state store but
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,951 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding
record to task 1_10 for topic TOPIC-T1 and partition 10 with key
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1
consuming key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,953 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.ProcessorNode:
KSTREAM-TRANSFORMVALUES-NODE1 processing key
!0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,954 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10
(StreamThread-15) Aggregator: Processed '1' of '3' messages for
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,955 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.ProcessorNode:
KSTREAM-FILTER-NODE1 processing key !0d81fc45-f485-4676-901f-6c1ced7042b0 for
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
Record 2 (of 3) aggregated and sub-aggregation written to local state store but
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,969 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,969 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding
record to task 1_10 for topic TOPIC-T1 and partition 10 with key
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,971 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1
consuming key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,974 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.ProcessorNode:
KSTREAM-TRANSFORMVALUES-NODE1 processing key
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,978 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10
(StreamThread-15) Aggregator: Processed '2' of '3' messages for
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,981 [INFO] (StreamThread-18)
org.apache.kafka.streams.processor.internals.ProcessorNode:
KSTREAM-FILTER-NODE1 processing key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for
request ID