[ https://issues.apache.org/jira/browse/KAFKA-10848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268141#comment-17268141 ]
Guozhang Wang commented on KAFKA-10848: --------------------------------------- I'm thinking about maybe exposing a `RecordChooser` interface that allows user customization. In some form of this: Record choose(Map<TopicPartition, Record>) ------- I.e. given a task that have multiple input topic partitions, pass in a map for each partition, the next available record (for partitions that do not have a next record available, we can still encode into the map as a sentinel on the value). The return record is the one among the passed in map, of what to process next. Even further, we can allow the impl to return a sentinel value for the return record indicating "do not process any record yet" --- this would in practice be used when some of the partitions do not have available record and users want to avoid enforced-processing. I.e. to go even beyond our current hard-coded behavior of KAFKA-10867. The default impl of it would be based on timestamp of the record, and based on the task-idling config. cc [~vvcephei] what do you think? > Allow fine grained control over cross-partition processing order > ---------------------------------------------------------------- > > Key: KAFKA-10848 > URL: https://issues.apache.org/jira/browse/KAFKA-10848 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Major > Labels: needs-kip > > Currently, KafkaStreams implements a hard-coded timestamp based strategy to > pick the next record to process for a task, given that a task has multiple > partitions. > In general, this strategy works well for the DSL, but for PAPI users, there > might be cases when the strategy should be customized. And even for the DSL, > there is one corner case (for a stream-table join) for which the table-side > record should be processed first if two records have the same timestamp (at > least, this gap exists as long as we don't have multi-version KTables), while > we cannot enforce this behavior because at runtime we don't know anything > about KStream vs KTable or an existing downstream join. > Thus, we might want to allow users to plugin a custom strategy to pick the > next record for processing. -- This message was sent by Atlassian Jira (v8.3.4#803005)