Hi Jeff, We currently does not expose the TimestampExtractor, as it will always be applied for all records polled from consumer automatically.
As for your case, do you have the JSON-formatted along with non-JSON messages on the same topic? In that case, I agree with you that you could do the filtering on a first topology and pipe to another topic with pure JSON formatted messages. Guozhang On Fri, Apr 15, 2016 at 7:56 AM, Jeff Klukas <jklu...@simple.com> wrote: > The only hook I see for specifying a TimestampExtractor is in the > Properties that you pass when creating a KafkaStreams instance. Is it > possible to modify the timestamp while processing a stream, or does the > timestamp need to be extracted immediately upon entry into the topology? > > I have a case where I'm creating a KStream from a topic with mostly > JSON-formatted messages. I need to deserialize as byte array, filter out > non-JSON messages, call .map on the stream to deserialize those objects > into desired POJOs, and only then reach into the objects to extract the > desired timestamp. > > Workarounds I've imagined are either to define a TimestampExtractor that > attempts to do some partial deserialization of the payload to get at the > timestamp field; or, to create two separate topologies, with the second one > reading a topic that's already filtered. > -- -- Guozhang