I see.

What I meant was to return min_timestamp for bad records in the timestamp
handler passed to KafkaIO itself, and correct timestamp for parsable
records. That should work too, right?

On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <lc...@google.com> wrote:

> Yes, that would be fine.
>
> The user could then use a ParDo which outputs to a DLQ for things it can't
> parse the timestamp for and use outputWithTimestamp[1] for everything else.
>
> 1:
> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>
> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <rang...@google.com> wrote:
>
>> Thanks. So returning  min timestamp is OK, right (assuming application
>> fine is with what it means)?
>>
>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> All records in Apache Beam have a timestamp. The default timestamp is
>>> the min timestamp defined here:
>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>
>>>
>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <rang...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> You would have to return min timestamp for all records otherwise the
>>>>> watermark may have advanced and you would be outputting records that are
>>>>> droppably late.
>>>>>
>>>>
>>>> That would be fine I guess. What’s the timestamp for a record that
>>>> doesn’t have one?
>>>>
>>>>
>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <rang...@google.com>
>>>>> wrote:
>>>>>
>>>>>> To be clear, returning min_timestamp for unparsable records shound
>>>>>> not affect the watermark.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>> redirected by the ParDo after that.
>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API,
>>>>>>> is this pipeline defined under kafkaio package?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In this case, the user is attempting to handle errors when parsing
>>>>>>>> the timestamp. The timestamp controls the watermark for the
>>>>>>>> UnboundedSource, how would they control the watermark in a downstream 
>>>>>>>> ParDo?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying
>>>>>>>>>> a function that would result in an exception,  this can be extracted 
>>>>>>>>>> by a
>>>>>>>>>> ParDo down the line.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>>>>>> there is a good reason not to.
>>>>>>>>> Given that, do we think Beam should provide a tranform that makes
>>>>>>>>> to simpler to handle deadletter output? I think there was a thread 
>>>>>>>>> about it
>>>>>>>>> in the past.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>
>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert
>>>>>>>>>>> that you can extract your Success Records (TupleTag) and your 
>>>>>>>>>>> DeadLetter
>>>>>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt.
>>>>>>>>>>> 2018, 05:18:
>>>>>>>>>>>
>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to 
>>>>>>>>>>>> handle record
>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler than 
>>>>>>>>>>>> each
>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>>> probably not something that can easily be supported. We might be 
>>>>>>>>>>>>> able to
>>>>>>>>>>>>> support similar features when we have Kafka on top of Splittable 
>>>>>>>>>>>>> DoFn
>>>>>>>>>>>>> though.
>>>>>>>>>>>>>
>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is a great question. I've added the dev list to be sure
>>>>>>>>>>>>>> it gets noticed by whoever may know best.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline
>>>>>>>>>>>>>>> that uses a KafkaIO
>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be
>>>>>>>>>>>>>>> able to produce a
>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>>>>>> "earliest"))
>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>>>>>>> "true"))
>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to