This is a great question. I've added the dev list to be sure it gets noticed by whoever may know best.
Kenn On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: > > Hi, > > Is there a way to get a Deadletter Output from a pipeline that uses a > KafkaIO > connector for it's input? As `TimestampPolicyFactory.withTimestampFn()` > takes > only a SerializableFunction and not a ParDo, how would I be able to > produce a > Deadletter output from it? > > I have the following pipeline defined that reads from a KafkaIO input: > > pipeline.apply( > KafkaIO.<String, String>read() > .withBootstrapServers(bootstrap) > .withTopics(topics) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(ConfigurableDeserializer.class) > .updateConsumerProperties( > ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, > inputMessagesConfig)) > .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", > "earliest")) > .updateConsumerProperties(ImmutableMap.of("group.id", > "beam-consumers")) > .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", > "true")) > .withTimestampPolicyFactory( > TimestampPolicyFactory.withTimestampFn( > new MessageTimestampExtractor(inputMessagesConfig))) > .withReadCommitted() > .commitOffsetsInFinalize()) > > > and I like to get deadletter outputs when my timestamp extraction fails. > > Best, > Tobi > >