[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833708#comment-15833708
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---------------------------------------

[~elevy] I am not sure I got your point. Please correct me if I am wrong in my 
assumptions. Lets look at different possibilities:

1. We create separate {{builder.stream/table(topicName)}} instances in 
application. So we clearly separate {{KTable}}/{{KStream}}s, one per topic. In 
this case, we can develop a solution described above. So, create different 
{{TimestampExtractor}} and as a result, assign one {{timeStampExtractor} per 
topic.

2. If we create {{builder.stream/table(topicName1, topicName2 ...)}} this can 
be challenging, as there is no clear separation of topics within stream 
application. 

I think you pointed out the second case. As a solution, we can provide 
{{Map<String, TimestampExtractor >}} to topology, indicating which 
{{TimestampExtractor}} is related to which topic. As a result, if the topics 
can change in runtime, the stream application can handle it.

Please correct me if I am wrong in my assumptions.

> Allow per stream/table timestamp extractor
> ------------------------------------------
>
>                 Key: KAFKA-4144
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4144
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.1
>            Reporter: Elias Levy
>            Assignee: Jeyhun Karimov
>              Labels: api
>
> At the moment the timestamp extractor is configured via a StreamConfig value 
> to KafkaStreams.  That means you can only have a single timestamp extractor 
> per app, even though you may be joining multiple streams/tables that require 
> different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> KStreamBuilder.stream/table, just like you can specify key and value serdes 
> that override the StreamConfig defaults.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to