[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey for state stores with Processor API

2022-08-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577633#comment-17577633
 ] 

Guozhang Wang commented on KAFKA-14070:
---

Hello [~balajirrao] Thanks for the updated description! That's much clearer 
now. Yes the `queryMetadataForKey` overloaded func without the 
{{StreamPartitioner}} does not work very well with the processor API since it's 
assuming the key of the store is inherited from the key of the input (or 
repartition) topic. If user is storing the key in a different manner, they'd 
need to use the other overloaded func that requires {{StreamPartitioner}}. But 
like you brought up in the second example, if the partitioning scheme is not 
dependent by the key (e.g. if it's by the value) then that function does not 
help either. I think in the near term it's definitely necessary to improve our 
docs clarifying that for the PAPI users --- would you like to file a PR?

In the long run, we should consider generalizing this function to allow users 
provide any form of partitioning schemes.

> Improve documentation for queryMetadataForKey for state stores with Processor 
> API
> -
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Balaji Rao
>Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
> One could use state stores in Processor API in ways that would make it 
> impossible to use {{queryMetadataForKey}} with just a key alone - one would 
> have to know the input record's key. This could lead to the method being 
> called with incorrect expectations. The documentation could be improved 
> around this, and around using state stores with the Processor API in general.
> Example Scala snippet:
> {code:scala}
> val input = streamsBuilder.stream(
> "input-topic",
> Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
>   )
>   private val storeBuilder = Stores
> .keyValueStoreBuilder[String, String](
>   Stores.inMemoryKeyValueStore("store"),
>   Serdes.stringSerde,
>   Serdes.stringSerde
> )
>   streamsBuilder.addStateStore(storeBuilder)
>   input.process(
> new ProcessorSupplier[Int, String, Void, Void] {
>   override def get(): Processor[Int, String, Void, Void] =
> new Processor[Int, String, Void, Void] {
>   var store: KeyValueStore[String, String] = _
>   override def init(context: ProcessorContext[Void, Void]): Unit = {
> super.init(context)
> store = context.getStateStore("store")
>   }
>   override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x =>
>   store.put(s"${record.key}-$x", record.value)
> )
>   }
> }
> },
> "store"
>   )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of 
> the {{store}} containing the key {{"1-a"}} could be determined by calling 
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
> this case the {{Int}} 1, to find the partition.
>  
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
>   ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with 
> different values in different partitions. In this case, AFAICT, one must use 
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
> given {{String}} would be stored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey

2022-07-13 Thread Balaji Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566639#comment-17566639
 ] 

Balaji Rao commented on KAFKA-14070:


Hi [~guozhang], thank you for your interest. When I was preparing a reply to 
your question, I realised that my original description did not touch the right 
issue. I've changed the description completely. Please take a look and let me 
know if it helps. Thank you!

> Improve documentation for queryMetadataForKey
> -
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Balaji Rao
>Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
> One could use state stores in Processor API in ways that would make it 
> impossible to use {{queryMetadataForKey}} with just a key alone - one would 
> have to know the input record's key. This could lead to the method being 
> called with incorrect expectations. The documentation could be improved 
> around this, and around using state stores with the Processor API in general.
> Example snippet:
> {code:scala}
> val input = streamsBuilder.stream(
> "input-topic",
> Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
>   )
>   private val storeBuilder = Stores
> .keyValueStoreBuilder[String, String](
>   Stores.inMemoryKeyValueStore("store"),
>   Serdes.stringSerde,
>   Serdes.stringSerde
> )
>   streamsBuilder.addStateStore(storeBuilder)
>   input.process(
> new ProcessorSupplier[Int, String, Void, Void] {
>   override def get(): Processor[Int, String, Void, Void] =
> new Processor[Int, String, Void, Void] {
>   var store: KeyValueStore[String, String] = _
>   override def init(context: ProcessorContext[Void, Void]): Unit = {
> super.init(context)
> store = context.getStateStore("store")
>   }
>   override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x =>
>   store.put(s"${record.key}-$x", record.value)
> )
>   }
> }
> },
> "store"
>   )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of 
> the {{store}} containing the key {{"1-a"}} could be determined by calling 
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
> this case the {{Int}} 1, to find the partition.
>  
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
>   ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with 
> different values in different partition. In this case, AFAICT, one must use 
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
> given {{String}} would be stored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey

2022-07-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566414#comment-17566414
 ] 

Guozhang Wang commented on KAFKA-14070:
---

Hello [~balajirrao], thanks for filing this ticket. Could you provide a 
specific example of key types that would break `queryMetadataForKey`? Note that 
the function takes in a partitioner still to ask users to provide the 
partitioner if possible to determine which partition contains the specific key.

> Improve documentation for queryMetadataForKey
> -
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Balaji Rao
>Priority: Minor
>
> When using key-value state stores with Processor API, one can add key-value 
> state stores of arbitrary key types to a topology. This could lead to the 
> method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
> expectations.
> In my understanding, `queryMetadataForKey` uses the source topics of the 
> processor connected to the store to return the `KeyQueryMetadata`. This means 
> that it could provide "incorrect" answers when used with key-value stores of 
> arbitrary key types. The description of the method should be improved to make 
> users aware of this pitfall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)