[ https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833913#comment-15833913 ]
Elias Levy commented on KAFKA-4144: ----------------------------------- Jeyhun, sorry if I was not clear. My comment about not being able to configure {{TimestampExtractor}} is only applicable to the current implementation, as it is instantiated by {{StreamTask}} and the current interface of {{TimestampExtractor}} provides not access to configuration data. So right now if you are consuming multiple distinct topics as different streams and/or tables, you have to create a case statement in your {{TimestampExtractor}} to handle all the topics with a single extractor, as that is all that you can configure right now via the {{timestamp.extractor}} configuration property. And because {{TimestampExtractor}} is instantiated by {{StreamTask}} and the extractor does not have access to configuration data, you must hardcode the topic names in the extractor. That means you can't change input topic names dynamically. You can to recompile to change them. Obviously these observation is are longer applicable if the application can instantiate it's own {{TimestampExtractor}}s and pass them as an argument to {{TopologyBuilder.addSource}}, {{KStreamBuilder.source}}, and {{KStreamBuilder.table}}. As for Matthias' comments, I agree with #1 and #3. Not quite sure what he means by #2. Surely I could create a {{TimestampExtractor}} passing the constructor a topic name read from a config file while defining the topology, just like I can create SerDes that support the Confluent schema registry, where the registry endpoint is read from the config and use those while defining the topology. > Allow per stream/table timestamp extractor > ------------------------------------------ > > Key: KAFKA-4144 > URL: https://issues.apache.org/jira/browse/KAFKA-4144 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.1 > Reporter: Elias Levy > Assignee: Jeyhun Karimov > Labels: api > > At the moment the timestamp extractor is configured via a StreamConfig value > to KafkaStreams. That means you can only have a single timestamp extractor > per app, even though you may be joining multiple streams/tables that require > different timestamp extraction methods. > You should be able to specify a timestamp extractor via > KStreamBuilder.stream/table, just like you can specify key and value serdes > that override the StreamConfig defaults. -- This message was sent by Atlassian JIRA (v6.3.4#6332)