Hi Jeff,

We currently does not expose the TimestampExtractor, as it will always be
applied for all records polled from consumer automatically.

As for your case, do you have the JSON-formatted along with non-JSON
messages on the same topic? In that case, I agree with you that you could
do the filtering on a first topology and pipe to another topic with pure
JSON formatted messages.


Guozhang


On Fri, Apr 15, 2016 at 7:56 AM, Jeff Klukas <jklu...@simple.com> wrote:

> The only hook I see for specifying a TimestampExtractor is in the
> Properties that you pass when creating a KafkaStreams instance. Is it
> possible to modify the timestamp while processing a stream, or does the
> timestamp need to be extracted immediately upon entry into the topology?
>
> I have a case where I'm creating a KStream from a topic with mostly
> JSON-formatted messages. I need to deserialize as byte array, filter out
> non-JSON messages, call .map on the stream to deserialize those objects
> into desired POJOs, and only then reach into the objects to extract the
> desired timestamp.
>
> Workarounds I've imagined are either to define a TimestampExtractor that
> attempts to do some partial deserialization of the payload to get at the
> timestamp field; or, to create two separate topologies, with the second one
> reading a topic that's already filtered.
>



-- 
-- Guozhang

Reply via email to