[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179535#comment-17179535
 ] 

Bruno Cadonna commented on KAFKA-10357:
---------------------------------------

bq. Can we leverage the committed offsets somehow? It seems like if the 
repartition topics don't exist but the group has committed offsets for them, 
then they must have been deleted

If I understand it correctly, when a topic is deleted also its committed 
offsets are deleted. That means, the situation you describe is not possible. 
Please let me know, if I missed anything.

bq.  It seems preferable to me to have streams be able to detect when its 
internal state is invalid

As far as I understand, that is exactly what we try to do. We want to detect 
the deletion of a repartition topic and notify the user about it through an 
exception. How the users react to the exception is their business. Admittedly, 
we need to provide some additional functionality to better react on such 
situations.

I am not against a manual initialization step, but I have two concerns:

1) worse out-of-the-box experience because manual steps are required before you 
can play around with Streams
2) exposure of internals like the repartition topics

To solve 1) we could introduce a config that tells Streams to assume that the 
internal topics (or just some of them) are pre-created and therefore not to 
setup them. To avoid the exposure of internal topics we could abstract the 
manual initialization to hide internals. However, what would then happen when a 
repartition topic is deleted? What should Streams do when it can assume that 
internal topics are pre-created and it does not find a repartition topic? 
Either it silently shuts down or it throws an exception on which users can 
react upon. I am in favor of the second.

Instead to the manual initialization step, I would prefer a way to persist a 
flag that indicates that an automatic initialization was performed. If 
something unexpected happens the application could then be reset to a valid 
state with the application reset tool and the application reset tool would also 
reset the flag. But currently, I do not know where we could persist such a 
flag. Maybe somewhere on the brokers and let the flag be managed by the group 
coordinator? WDYT?       

> Handle accidental deletion of repartition-topics as exceptional failure
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-10357
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10357
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Bruno Cadonna
>            Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to