[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983797#comment-16983797
 ] 

Tommy Becker commented on KAFKA-7663:
-------------------------------------

Thanks for the comment [~vvcephei] . In the reporter's defense, I'm pretty sure 
that javadoc was added before this bug was opened ;)  This is a pretty 
important use-case for us. The topic we build the global store from is 
heterogeneous; we only need some types of records and need to rekey others 
prior to joining to our event stream. Personally, I feel like your second 
option (i.e. maintaining a real changelog topic) is probably overkill and 
simply running the records through the processors during restoration is 
adequate. Seems like we could even keep the current "fast path" restoration 
that bypasses serde when there is no custom processor configured.

With respect to your recommended work-around, the problem with that is that if 
I'm not mistaken you lose the critical property of the global store being fully 
populated when the rest of the topology starts up. Because although the store 
will wait for "my-global-changelog" topic to be caught up, that doesn't mean 
much since that topic is itself fed from the main topology.

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to