[
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566639#comment-17566639
]
Balaji Rao commented on KAFKA-14070:
------------------------------------
Hi [~guozhang], thank you for your interest. When I was preparing a reply to
your question, I realised that my original description did not touch the right
issue. I've changed the description completely. Please take a look and let me
know if it helps. Thank you!
> Improve documentation for queryMetadataForKey
> ---------------------------------------------
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.2.0
> Reporter: Balaji Rao
> Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky.
> One could use state stores in Processor API in ways that would make it
> impossible to use {{queryMetadataForKey}} with just a key alone - one would
> have to know the input record's key. This could lead to the method being
> called with incorrect expectations. The documentation could be improved
> around this, and around using state stores with the Processor API in general.
> Example snippet:
> {code:scala}
> val input = streamsBuilder.stream(
> "input-topic",
> Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
> )
> private val storeBuilder = Stores
> .keyValueStoreBuilder[String, String](
> Stores.inMemoryKeyValueStore("store"),
> Serdes.stringSerde,
> Serdes.stringSerde
> )
> streamsBuilder.addStateStore(storeBuilder)
> input.process(
> new ProcessorSupplier[Int, String, Void, Void] {
> override def get(): Processor[Int, String, Void, Void] =
> new Processor[Int, String, Void, Void] {
> var store: KeyValueStore[String, String] = _
> override def init(context: ProcessorContext[Void, Void]): Unit = {
> super.init(context)
> store = context.getStateStore("store")
> }
> override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x =>
> store.put(s"${record.key}-$x", record.value)
> )
> }
> }
> },
> "store"
> )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of
> the {{store}} containing the key {{"1-a"}} could be determined by calling
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in
> this case the {{Int}} 1, to find the partition.
>
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with
> different values in different partition. In this case, AFAICT, one must use
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a
> given {{String}} would be stored.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)