Yes, state in timestamp policy but simple, not like ParDo state.
TimestampPolicy appears to be long lived instance. Take an inspiration from
already existing policies, e.g. like the one here:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L135


On Mon, Oct 29, 2018 at 11:57 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
wrote:

> Sorry for not replying, I was sick with a flu.
>
> On Thu, Oct 25, 2018 at 9:56 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> what I ended up doing, when I could not for any reasono rely on kafka
>> timestamps, but need to parse them form message is:
>>
>> * have a cusom kafka deserializer which never throws but returns message
>> which is either a success with parsed data structure plus timestamp or
>> failure with original kafka bytes payload
>> * timestamp policy than can extract timestamp in case of a success
>> deserialize result and in case of failure result, I am returning timestamp
>> of a last success message  ( in my case messages are not terribly out of
>> order and failures are rather rare )
>>
>
> It's possible that my stream contains data that is very old (when
> rewinding a topic, lets say it goes back to 2012). If I get the logic here
> correctly this means I need to remember the last successfully parsed
> timestamp. Do you solve this via StatefulProcessing?
>
>
>> * following ParDo then side output failures to dead letters
>>
>>
>
>
>> On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi <rang...@google.com> wrote:
>>>
>>>> Well, if every input record's timestamp is X, watermark staying at X is
>>>> the right answer, no? But I am not sure where the disagreement is,
>>>> actually. I might be mistaken.
>>>>
>>>> KafkaIO has a few in-built policies for watermark and timestamp that
>>>> cover most use cases (including server time, which has a benefit of
>>>> providing perfect watermark). It also gives fairly complete control on
>>>> these to users if they chose to. I think it looks like reasonable for a
>>>> policy to base its watermark only only on parsable records, and ignore
>>>> unparsable records w.r.t watermark calculation.
>>>>
>>>
>>> But then doesn't that force the user to set max allowed lateness to
>>> infinity, otherwise these records will be dropped?
>>>
>>> It could even assign a timestamp that makes more logical sense in a
>>>> particular application.
>>>>
>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Forgive me if this is naive or missing something, but here are my
>>>>> thoughts on these alternatives:
>>>>>
>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>> watermark. Luke's point is imortant.
>>>>>
>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>> will
>>>>> not allow output to a dead-letter queue.
>>>>>
>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>> frequent, the watermark will never advance. So windows/aggregations would
>>>>> never be considered complete. Triggers could be used to get output anyhow,
>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>> runner.
>>>>>
>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>>>> to a user-specified dead letter queue. I think this same level of support
>>>>> is also required for records that cannot have timestamps extracted in an
>>>>> unbounded source.
>>>>>
>>>>> In an SDF I think the function has enough control to do it all in
>>>>> "userland", so Cham is right on here.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>> emitted
>>>>>> with the minimum timestamp.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I see.
>>>>>>>
>>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>>> parsable records. That should work too, right?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, that would be fine.
>>>>>>>>
>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things
>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for
>>>>>>>> everything else.
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <rang...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>>> application fine is with what it means)?
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> All records in Apache Beam have a timestamp. The default
>>>>>>>>>> timestamp is the min timestamp defined here:
>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <rang...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You would have to return min timestamp for all records
>>>>>>>>>>>> otherwise the watermark may have advanced and you would be 
>>>>>>>>>>>> outputting
>>>>>>>>>>>> records that are droppably late.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a
>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <
>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there 
>>>>>>>>>>>>>>>> was a thread
>>>>>>>>>>>>>>>> about it in the past.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) 
>>>>>>>>>>>>>>>>>> and your
>>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my 
>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could
>>>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is 
>>>>>>>>>>>>>>>>>>> simpler than each
>>>>>>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this
>>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. We 
>>>>>>>>>>>>>>>>>>>> might be able to
>>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to
>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how
>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a
>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from
>>>>>>>>>>>>>>>>>>>>>> a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my
>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>
> --
> Tobias Kaymak
> Data Engineer
>
> tobias.kay...@ricardo.ch
> www.ricardo.ch
>

Reply via email to