The only hook I see for specifying a TimestampExtractor is in the
Properties that you pass when creating a KafkaStreams instance. Is it
possible to modify the timestamp while processing a stream, or does the
timestamp need to be extracted immediately upon entry into the topology?

I have a case where I'm creating a KStream from a topic with mostly
JSON-formatted messages. I need to deserialize as byte array, filter out
non-JSON messages, call .map on the stream to deserialize those objects
into desired POJOs, and only then reach into the objects to extract the
desired timestamp.

Workarounds I've imagined are either to define a TimestampExtractor that
attempts to do some partial deserialization of the payload to get at the
timestamp field; or, to create two separate topologies, with the second one
reading a topic that's already filtered.

Reply via email to