That depends on the users pipeline and how watermark advancement of the
source may impact elements becoming droppably late if they are emitted with
the minimum timestamp.

On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <> wrote:

> I see.
> What I meant was to return min_timestamp for bad records in the timestamp
> handler passed to KafkaIO itself, and correct timestamp for parsable
> records. That should work too, right?
> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <> wrote:
>> Yes, that would be fine.
>> The user could then use a ParDo which outputs to a DLQ for things it
>> can't parse the timestamp for and use outputWithTimestamp[1] for everything
>> else.
>> 1:
>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <> wrote:
>>> Thanks. So returning  min timestamp is OK, right (assuming application
>>> fine is with what it means)?
>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <> wrote:
>>>> All records in Apache Beam have a timestamp. The default timestamp is
>>>> the min timestamp defined here:
>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <>
>>>> wrote:
>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <> wrote:
>>>>>> You would have to return min timestamp for all records otherwise the
>>>>>> watermark may have advanced and you would be outputting records that are
>>>>>> droppably late.
>>>>> That would be fine I guess. What’s the timestamp for a record that
>>>>> doesn’t have one?
>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <>
>>>>>> wrote:
>>>>>>> To be clear, returning min_timestamp for unparsable records shound
>>>>>>> not affect the watermark.
>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <>
>>>>>>> wrote:
>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>> redirected by the ParDo after that.
>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>>>>>>> is this pipeline defined under kafkaio package?
>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <>
>>>>>>>> wrote:
>>>>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>>>>> ParDo?
>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <>
>>>>>>>>> wrote:
>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying
>>>>>>>>>>> a function that would result in an exception,  this can be 
>>>>>>>>>>> extracted by a
>>>>>>>>>>> ParDo down the line.
>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>> Given that, do we think Beam should provide a tranform that makes
>>>>>>>>>> to simpler to handle deadletter output? I think there was a thread 
>>>>>>>>>> about it
>>>>>>>>>> in the past.
>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>>>>> DeadLetter
>>>>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>>>> Raghu Angadi <> schrieb am Mi., 24. Okt.
>>>>>>>>>>>> 2018, 05:18:
>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to 
>>>>>>>>>>>>> handle record
>>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler than 
>>>>>>>>>>>>> each
>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>>>> probably not something that can easily be supported. We might be 
>>>>>>>>>>>>>> able to
>>>>>>>>>>>>>> support similar features when we have Kafka on top of Splittable 
>>>>>>>>>>>>>> DoFn
>>>>>>>>>>>>>> though.
>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be sure
>>>>>>>>>>>>>>> it gets noticed by whoever may know best.
>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline
>>>>>>>>>>>>>>>> that uses a KafkaIO
>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be
>>>>>>>>>>>>>>>> able to produce a
>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>>>>>>> "earliest"))
>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("",
>>>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("",
>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Tobi

Reply via email to