what I ended up doing, when I could not for any reasono rely on kafka
timestamps, but need to parse them form message is:

* have a cusom kafka deserializer which never throws but returns message
which is either a success with parsed data structure plus timestamp or
failure with original kafka bytes payload
* timestamp policy than can extract timestamp in case of a success
deserialize result and in case of failure result, I am returning timestamp
of a last success message  ( in my case messages are not terribly out of
order and failures are rather rare )
* following ParDo then side output failures to dead letters


On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax <[email protected]> wrote:

>
>
> On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi <[email protected]> wrote:
>
>> Well, if every input record's timestamp is X, watermark staying at X is
>> the right answer, no? But I am not sure where the disagreement is,
>> actually. I might be mistaken.
>>
>> KafkaIO has a few in-built policies for watermark and timestamp that
>> cover most use cases (including server time, which has a benefit of
>> providing perfect watermark). It also gives fairly complete control on
>> these to users if they chose to. I think it looks like reasonable for a
>> policy to base its watermark only only on parsable records, and ignore
>> unparsable records w.r.t watermark calculation.
>>
>
> But then doesn't that force the user to set max allowed lateness to
> infinity, otherwise these records will be dropped?
>
> It could even assign a timestamp that makes more logical sense in a
>> particular application.
>>
>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> Forgive me if this is naive or missing something, but here are my
>>> thoughts on these alternatives:
>>>
>>> (0) Timestamp has to be pulled out in the source to control the
>>> watermark. Luke's point is imortant.
>>>
>>> (1) If bad records get min_timestamp, and they occur infrequently
>>> enough, then watermark will advance and they will all be dropped. That will
>>> not allow output to a dead-letter queue.
>>>
>>> (2) If you have always min_timestamp records, or if bad records are
>>> frequent, the watermark will never advance. So windows/aggregations would
>>> never be considered complete. Triggers could be used to get output anyhow,
>>> but it would never be a final answer. I think it is not in the spirit of
>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>> runner.
>>>
>>> In SQL there is an actual "dead letter" option when creating a table
>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>> parsed to the expected schema - like maybe an avro record got in the
>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>> to a user-specified dead letter queue. I think this same level of support
>>> is also required for records that cannot have timestamps extracted in an
>>> unbounded source.
>>>
>>> In an SDF I think the function has enough control to do it all in
>>> "userland", so Cham is right on here.
>>>
>>> Kenn
>>>
>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> That depends on the users pipeline and how watermark advancement of the
>>>> source may impact elements becoming droppably late if they are emitted with
>>>> the minimum timestamp.
>>>>
>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>> I see.
>>>>>
>>>>> What I meant was to return min_timestamp for bad records in the
>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>> parsable records. That should work too, right?
>>>>>
>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Yes, that would be fine.
>>>>>>
>>>>>> The user could then use a ParDo which outputs to a DLQ for things it
>>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for 
>>>>>> everything
>>>>>> else.
>>>>>>
>>>>>> 1:
>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>> application fine is with what it means)?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> All records in Apache Beam have a timestamp. The default timestamp
>>>>>>>> is the min timestamp defined here:
>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You would have to return min timestamp for all records otherwise
>>>>>>>>>> the watermark may have advanced and you would be outputting records 
>>>>>>>>>> that
>>>>>>>>>> are droppably late.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> That would be fine I guess. What’s the timestamp for a record that
>>>>>>>>> doesn’t have one?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>>>>>>>>>>>> API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was 
>>>>>>>>>>>>>> a thread
>>>>>>>>>>>>>> about it in the past.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) and 
>>>>>>>>>>>>>>>> your
>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt.
>>>>>>>>>>>>>>>> 2018, 05:18:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my 
>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could
>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is 
>>>>>>>>>>>>>>>>> simpler than each
>>>>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is
>>>>>>>>>>>>>>>>>> probably not something that can easily be supported. We 
>>>>>>>>>>>>>>>>>> might be able to
>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be
>>>>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would
>>>>>>>>>>>>>>>>>>>> I be able to produce a
>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a
>>>>>>>>>>>>>>>>>>>> KafkaIO input:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>

Reply via email to