[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566639#comment-17566639
 ] 

Balaji Rao commented on KAFKA-14070:
------------------------------------

Hi [~guozhang], thank you for your interest. When I was preparing a reply to 
your question, I realised that my original description did not touch the right 
issue. I've changed the description completely. Please take a look and let me 
know if it helps. Thank you!

> Improve documentation for queryMetadataForKey
> ---------------------------------------------
>
>                 Key: KAFKA-14070
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14070
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.2.0
>            Reporter: Balaji Rao
>            Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
> One could use state stores in Processor API in ways that would make it 
> impossible to use {{queryMetadataForKey}} with just a key alone - one would 
> have to know the input record's key. This could lead to the method being 
> called with incorrect expectations. The documentation could be improved 
> around this, and around using state stores with the Processor API in general.
> Example snippet:
> {code:scala}
> val input = streamsBuilder.stream(
>     "input-topic",
>     Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
>   )
>   private val storeBuilder = Stores
>     .keyValueStoreBuilder[String, String](
>       Stores.inMemoryKeyValueStore("store"),
>       Serdes.stringSerde,
>       Serdes.stringSerde
>     )
>   streamsBuilder.addStateStore(storeBuilder)
>   input.process(
>     new ProcessorSupplier[Int, String, Void, Void] {
>       override def get(): Processor[Int, String, Void, Void] =
>         new Processor[Int, String, Void, Void] {
>           var store: KeyValueStore[String, String] = _
>           override def init(context: ProcessorContext[Void, Void]): Unit = {
>             super.init(context)
>             store = context.getStateStore("store")
>           }
>           override def process(record: Record[Int, String]): Unit = {
>             ('a' to 'j').foreach(x =>
>               store.put(s"${record.key}-$x", record.value)
>             )
>           }
>         }
>     },
>     "store"
>   )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of 
> the {{store}} containing the key {{"1-a"}} could be determined by calling 
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
> this case the {{Int}} 1, to find the partition.
>  
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
>   ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with 
> different values in different partition. In this case, AFAICT, one must use 
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
> given {{String}} would be stored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to