[
https://issues.apache.org/jira/browse/KAFKA-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16900693#comment-16900693
]
Bruno Cadonna commented on KAFKA-8755:
--------------------------------------
In a first analysis, I narrowed down the issue to the setting of the offset
limits in the following two code snippets.
{{AbstractTask#updateOffsetLimits()}}
{code:java}
final OffsetAndMetadata metadata = consumer.committed(partition);
final long offset = metadata != null ? metadata.offset() : 0L;
{code}
{{ProcessorStateManager#offsetLimit}}
{code:java}
final Long limit = offsetLimits.get(partition);
return limit != null ? limit : Long.MAX_VALUE;
{code}
Currently, my guess is that {{AbstractTask#updateOffsetLimits()}} is called
during initialization and closing of the stand-by task but it is not called
during normal processing. The reason is that during normal processing
{{StandByTask#commit()}} is never called because the flag {{commitNeeded}} is
never set to true. This occurs only for stand-by tasks for optimized source
tables. Further investigation is needed to confirm my guess.
> Stand-by Task of an Optimized Source Table Does Not Write Anything to its
> State Store
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-8755
> URL: https://issues.apache.org/jira/browse/KAFKA-8755
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0
> Reporter: Bruno Cadonna
> Priority: Major
> Labels: newbie
> Attachments: StandbyTaskTest.java
>
>
> With the following topology:
> {code:java}
> builder.table(
> INPUT_TOPIC,
> Consumed.with(Serdes.Integer(), Serdes.Integer()),
> Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as(stateName)
> )
> {code}
> and with topology optimization turned on, Kafka Streams uses the input topic
> {{INPUT_TOPIC}} as the change log topic for state store {{stateName}}. A
> stand-by task for such a topology should read from {{INPUT_TOPIC}} and should
> write the records to its state store so that the streams client that runs the
> stand-by task can take over the execution of the topology in case of a
> failure with an up-to-date replica of the state.
> Currently, the stand-by task described above reads from the input topic but
> does not write the records to its state store. Thus, after a failure the
> stand-by task cannot provide any up-to-date state store and the streams
> client needs to construct the state from scratch before it can take over the
> execution.
> The described behaviour can be reproduced with the attached test.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)