[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16999963#comment-16999963
 ] 

Levani Kokhreidze edited comment on KAFKA-7663 at 12/19/19 11:03 AM:
---------------------------------------------------------------------

Hello, I've stumbled upon this issue. I have a use-case where I don't want to 
restore all the records from the topic for global store and would like to 
define some "filtering" logic for the restoration process. If I am not 
mistaken, (and it was already pointed out) 
`GlobalStateManagerImpl#restoreState` is called way before processor `init` 
method is called, therefore during restoration calling defined processor may 
not be feasible nor intuitive for the user since we would always expect `init` 
to be called before the `process`. I was thinking, what if we add one more 
additional parameter to the `addGlobalStore` method where users can define 
record restoration filtering logic, something like: 

 

 
{code:java}
public synchronized void addGlobalStore(
 final StoreBuilder<KeyValueStore> storeBuilder,
 final String sourceName,
 final String topic,
 final ConsumedInternal consumed,
 final String processorName,
 final ProcessorSupplier stateUpdateSupplier,
 // new method parameter
 final GlobalStateRestoreFilter globalStateRestoreFilter
) {
...
}{code}
Signature of the interface may look something like this

 
{code:java}
public interface GlobalStateRestoreFilter<K, V> {

  boolean shouldRestore(String topic, K key, V value);

} 
{code}
 

This change will require KIP, happy to work on it, but would like to hear 
initial ideas around this.

With this end-users will have clear contract if they would like to filter out 
some records during restoration process.

 

cc [~vvcephei] [~ableegoldman] [~pkleindl] [~twbecker]


was (Author: lkokhreidze):
Hello, I've stumbled upon this issue. I have a use-case where I don't want to 
restore all the records from the topic for global store and would like to 
define some "filtering" logic for the restoration process. If I am not 
mistaken, (and it was already pointed out) 
`GlobalStateManagerImpl#restoreState` is called way before processor `init` 
method is called, therefore during restoration calling defined processor may 
not be feasible nor intuitive for the user since we would always expect `init` 
to be called before the `process`. I was thinking, what if we add one more 
additional parameter to the `addGlobalStore` method where users can define 
record restoration filtering logic, something like: 

 

 
{code:java}
public synchronized void addGlobalStore(final StoreBuilder<KeyValueStore> 
storeBuilder,
 final String sourceName,
 final String topic,
 final ConsumedInternal consumed,
 final String processorName,
 final ProcessorSupplier stateUpdateSupplier,
 // new method parameter
 final GlobalStateRestoreFilter globalStateRestoreFilter) {
...
}{code}
Signature of the interface may look something like this

 
{code:java}
public interface GlobalStateRestoreFilter<K, V> {

  boolean shouldRestore(String topic, K key, V value);

} 
{code}
 

This change will require KIP, happy to work on it, but would like to hear 
initial ideas around this.

With this end-users will have clear contract if they would like to filter out 
some records during restoration process.

 

cc [~vvcephei] [~ableegoldman] [~pkleindl] [~twbecker]

> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to