To be clear, returning min_timestamp for unparsable records shound not
affect the watermark.

On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com> wrote:

> How about returning min_timestamp? The would be dropped or redirected by
> the ParDo after that.
> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
> this pipeline defined under kafkaio package?
>
> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> In this case, the user is attempting to handle errors when parsing the
>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>> how would they control the watermark in a downstream ParDo?
>>
>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote:
>>
>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>> function that would result in an exception,  this can be extracted by a
>>>> ParDo down the line.
>>>>
>>>
>>> KafkaIO does return bytes, and I think most sources should, unless there
>>> is a good reason not to.
>>> Given that, do we think Beam should provide a tranform that makes to
>>> simpler to handle deadletter output? I think there was a thread about it in
>>> the past.
>>>
>>>
>>>>
>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>> jcgarc...@gmail.com> wrote:
>>>>
>>>>> As Raghu said,
>>>>>
>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>>>> records(TupleTag) and do whatever you want with them.
>>>>>
>>>>>
>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018,
>>>>> 05:18:
>>>>>
>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>> record
>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>
>>>>>> If there is a transform in Beam that does this, it could be
>>>>>> convenient for users in many such scenarios. This is simpler than each
>>>>>> source supporting it explicitly.
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>>> not something that can easily be supported. We might be able to support
>>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>>
>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that
>>>>>>>>> uses a KafkaIO
>>>>>>>>> connector for it's input? As
>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able
>>>>>>>>> to produce a
>>>>>>>>> Deadletter output from it?
>>>>>>>>>
>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>>> input:
>>>>>>>>>
>>>>>>>>> pipeline.apply(
>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>     .withTopics(topics)
>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>> inputMessagesConfig))
>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>> "earliest"))
>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>> "beam-consumers"))
>>>>>>>>>
>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>> "true"))
>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>     .withReadCommitted()
>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>>>>> fails.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tobi
>>>>>>>>>
>>>>>>>>>

Reply via email to