[ https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16984797#comment-16984797 ]
Patrik Kleindl commented on KAFKA-7663: --------------------------------------- [~vvcephei] [~ableegoldman] See my comment above, I tried to get there during another issue (deserialize on restore to catch malformed records) but the order of initialization got in my way to call process() too. And I think it would be worth to allow three strategies for the restore: * restore only (current functionality) * restore and deserialize (to catch malformed records in the source topic but just put them 1:1 in the store) * restore and process (including deserialization, to run everything through a custom processor) > Custom Processor supplied on addGlobalStore is not used when restoring state > from topic > --------------------------------------------------------------------------------------- > > Key: KAFKA-7663 > URL: https://issues.apache.org/jira/browse/KAFKA-7663 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0 > Reporter: Frederic Tardif > Priority: Major > Attachments: image-2018-11-20-11-42-14-697.png > > > I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom > processor responsible to transform a K,V record from the input stream into a > V,K records. It works fine and my {{store.all()}} does print the correct > persisted V,K records. However, if I clean the local store and restart the > stream app, the global table is reloaded but without going through the > processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} > which simply stores the input topic K,V records into rocksDB (hence bypassing > the mapping function of my custom processor). I believe this must not be the > expected result? > This is a follow up on stackoverflow discussion around storing a K,V topic > as a global table with some stateless transformations based on a "custom" > processor added on the global store: > [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729] > If we address this issue, we should also apply > `default.deserialization.exception.handler` during restore (cf. KAFKA-8037) > -- This message was sent by Atlassian Jira (v8.3.4#803005)