Lucas Brutschy created KAFKA-18986:
--------------------------------------
Summary: Properly handle configured topology soft-state using
KIP-1101
Key: KAFKA-18986
URL: https://issues.apache.org/jira/browse/KAFKA-18986
Project: Kafka
Issue Type: Sub-task
Reporter: Lucas Brutschy
In KIP-1101, the number of input topics and some internal topics is left
parametric, so that a single application can be deployed with varying number of
partitions. The process of turning a 'parametric' topology into a topology
where the number of partitions is defined for all topics and the presence of
all internal topics is validated is called "configuring the topology". The
result of configuring the topology is either a configured topology, or a
validation error with potentially a set of internal topics that need to be
created.
The configured topology in streams groups is a result of taking the "partition
metadata" (current state of all the topics involved in a topology) and the
topology, both of which are persisted in the consumer offset topic and can be
restored on fail-over. However, the configured topology itself is not stored
(since it can be derived). We still want to keep it in memory, to avoid
recomputing it all the time. Right now, during a heartbeat RPC that either
detects a change in topic metadata or initializes a topology, we configure the
topology and store the topology as "soft state". Also, when partition metadata
/ topology records are replayed from the offset topic, the configured topology
will be derived.
This is improper for the following reasons
- When a heartbeat is handled and the configured topology is initialized, it
may be that the write operations of topology and partition metadata records
fails, which means that we'll end up in an inconsistent state, with an outdated
topology / partition metadata but a new configured topology.
- It can happen that the topology is configured several times - in a heartbeat
handler, and then again when replaying the new records from the consumer offset
topic
- The topology configuration is a non-trivial operation, both in time
complexity and generally in the amount of logic that is being executed.
Ideally, we should not execute this on the replay-path.
To solve these problems, the idea would be to correct key the configured
topology with a hash of the partition metadata and a topology epoch or topology
hash. That means
1. When a heartbeat handler initializes a new topology or detects changed
partition metadata, the computes new partition metadata hash (if necessary) and
we store the configured topology as soft state in the streams group in-memory
representation together, keyed by the topology epoch and the partition metadata
hash.
2. On the replay path, the configured topology is not touched at all.
3. When using the configured topology, both inside the describe-handler or the
heartbeat-handler, we can compare the topology epoch and the partition metadata
hash, to check if the current configured topology (soft state) matches the
partition metadata and topology epoch (hard state). If not, we can configure
the topology.
Fortunately, KIP-1101 introduces the means to generate partition metadata
hashes, and proposes saving only the hash to the consumer offset topic. We can
facilitate these changes to, rather trivially, fix the handling of configured
topologies in streams groups.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)