[ https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566639#comment-17566639 ]
Balaji Rao commented on KAFKA-14070: ------------------------------------ Hi [~guozhang], thank you for your interest. When I was preparing a reply to your question, I realised that my original description did not touch the right issue. I've changed the description completely. Please take a look and let me know if it helps. Thank you! > Improve documentation for queryMetadataForKey > --------------------------------------------- > > Key: KAFKA-14070 > URL: https://issues.apache.org/jira/browse/KAFKA-14070 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.2.0 > Reporter: Balaji Rao > Priority: Minor > > Using {{queryMetadataForKey}} for state stores with Processor API is tricky. > One could use state stores in Processor API in ways that would make it > impossible to use {{queryMetadataForKey}} with just a key alone - one would > have to know the input record's key. This could lead to the method being > called with incorrect expectations. The documentation could be improved > around this, and around using state stores with the Processor API in general. > Example snippet: > {code:scala} > val input = streamsBuilder.stream( > "input-topic", > Consumed.`with`(Serdes.intSerde, Serdes.stringSerde) > ) > private val storeBuilder = Stores > .keyValueStoreBuilder[String, String]( > Stores.inMemoryKeyValueStore("store"), > Serdes.stringSerde, > Serdes.stringSerde > ) > streamsBuilder.addStateStore(storeBuilder) > input.process( > new ProcessorSupplier[Int, String, Void, Void] { > override def get(): Processor[Int, String, Void, Void] = > new Processor[Int, String, Void, Void] { > var store: KeyValueStore[String, String] = _ > override def init(context: ProcessorContext[Void, Void]): Unit = { > super.init(context) > store = context.getStateStore("store") > } > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => > store.put(s"${record.key}-$x", record.value) > ) > } > } > }, > "store" > ) > {code} > In the code sample above, AFAICT there is no way the possible partition of > the {{store}} containing the key {{"1-a"}} could be determined by calling > {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call > {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in > this case the {{Int}} 1, to find the partition. > > Example 2: > The same as above, but with a different {{process}} method. > {code:scala} > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}")) > }{code} > In this case the key {{"a"}} could exist in multiple partitions, with > different values in different partition. In this case, AFAICT, one must use > {{queryMetadataForKey}} with an {{Int}} to determine the partition where a > given {{String}} would be stored. -- This message was sent by Atlassian Jira (v8.20.10#820010)