[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006463#comment-17006463
 ] 

Guozhang Wang commented on KAFKA-7663:
--------------------------------------

I thought about the opt-in optimization if we follow option 3), which basically 
needs to look into the Processor and check it is an instance of our internal 
processor which does no custom logic and just apply the upstream records to the 
state store. I felt it "might" work but without fully work it out I cannot say 
for sure --- if it did work I'm slightly leaning towards it mainly because it 
does not require API changes, as in option 1) we'd best remove the 
ProcessorSupplier in the param to completely forbid users trying to customize 
it.

If we want to fix it asap (which I think so, as we are touching on this piece 
right now anyways) then maybe we can try to see if the above opt-in can indeed 
work nicely first, and if not then maybe consider option 3).

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to