On Thu, Oct 25, 2018 at 10:28 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Not sure if I understand why this would require Kafka to behave as two
> independent sources.
>



> Won't setting a non-negative-infinity timestamp (for example processing
> time) for failed records be enough ?
>

Well that's what I suggested and that is what a user seems to have done.
The question what if that is not what we want and don't want to mix these
two together (at least from my reading of Luke's and Kenn's comments, which
could be off).


>
> Also (at least at some point) there were discussions on supporting SDFs to
> report different watermarks for different outputs. More details are
> available here:
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>
> - Cham
>
>
>>
>> Raghu.
>>
>>>
>>> It could even assign a timestamp that makes more logical sense in a
>>>> particular application.
>>>>
>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Forgive me if this is naive or missing something, but here are my
>>>>> thoughts on these alternatives:
>>>>>
>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>> watermark. Luke's point is imortant.
>>>>>
>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>> will
>>>>> not allow output to a dead-letter queue.
>>>>>
>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>> frequent, the watermark will never advance. So windows/aggregations would
>>>>> never be considered complete. Triggers could be used to get output anyhow,
>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>> runner.
>>>>>
>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>>>> to a user-specified dead letter queue. I think this same level of support
>>>>> is also required for records that cannot have timestamps extracted in an
>>>>> unbounded source.
>>>>>
>>>>> In an SDF I think the function has enough control to do it all in
>>>>> "userland", so Cham is right on here.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>> emitted
>>>>>> with the minimum timestamp.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I see.
>>>>>>>
>>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>>> parsable records. That should work too, right?
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, that would be fine.
>>>>>>>>
>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things
>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for
>>>>>>>> everything else.
>>>>>>>>
>>>>>>>> 1:
>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <rang...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>>> application fine is with what it means)?
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> All records in Apache Beam have a timestamp. The default
>>>>>>>>>> timestamp is the min timestamp defined here:
>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <rang...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> You would have to return min timestamp for all records
>>>>>>>>>>>> otherwise the watermark may have advanced and you would be 
>>>>>>>>>>>> outputting
>>>>>>>>>>>> records that are droppably late.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a
>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <
>>>>>>>>>>>>>> lc...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for 
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>>> rang...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there 
>>>>>>>>>>>>>>>> was a thread
>>>>>>>>>>>>>>>> about it in the past.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) 
>>>>>>>>>>>>>>>>>> and your
>>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my 
>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could
>>>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is 
>>>>>>>>>>>>>>>>>>> simpler than each
>>>>>>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this
>>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. We 
>>>>>>>>>>>>>>>>>>>> might be able to
>>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>> k...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to
>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how
>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a
>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from
>>>>>>>>>>>>>>>>>>>>>> a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my
>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>

Reply via email to