[ 
https://issues.apache.org/jira/browse/KAFKA-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268141#comment-17268141
 ] 

Guozhang Wang commented on KAFKA-10848:
---------------------------------------

I'm thinking about maybe exposing a `RecordChooser` interface that allows user 
customization. In some form of this:

Record choose(Map<TopicPartition, Record>)

-------

I.e. given a task that have multiple input topic partitions, pass in a map for 
each partition, the next available record (for partitions that do not have a 
next record available, we can still encode into the map as a sentinel on the 
value).

The return record is the one among the passed in map, of what to process next. 
Even further, we can allow the impl to return a sentinel value for the return 
record indicating "do not process any record yet" --- this would in practice be 
used when some of the partitions do not have available record and users want to 
avoid enforced-processing. I.e. to go even beyond our current hard-coded 
behavior of KAFKA-10867.

The default impl of it would be based on timestamp of the record, and based on 
the task-idling config.

cc [~vvcephei] what do you think?

> Allow fine grained control over cross-partition processing order
> ----------------------------------------------------------------
>
>                 Key: KAFKA-10848
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10848
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>              Labels: needs-kip
>
> Currently, KafkaStreams implements a hard-coded timestamp based strategy to 
> pick the next record to process for a task, given that a task has multiple 
> partitions.
> In general, this strategy works well for the DSL, but for PAPI users, there 
> might be cases when the strategy should be customized. And even for the DSL, 
> there is one corner case (for a stream-table join) for which the table-side 
> record should be processed first if two records have the same timestamp (at 
> least, this gap exists as long as we don't have multi-version KTables), while 
> we cannot enforce this behavior because at runtime we don't know anything 
> about KStream vs KTable or an existing downstream join.
> Thus, we might want to allow users to plugin a custom strategy to pick the 
> next record for processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to