I don't know how well RequiresTimeSortedInput will work for any late data.

I think you will want to include the Kafka offset in your records (unless
the records have their own sequence number) and then use state to buffer
and sort. There is a proposal (and work in progress) for a sorted state
API, which will make this easier and more efficient.

Reuven

On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com> wrote:

> For runners that support @RequiresTimeSortedInput, all your input will
> come time sorted (as long as your element's timestamp tracks the order that
> you want).
> For runners that don't support this, you need to build a StatefulDoFn that
> buffers out of order events and reorders them to the order that you need.
>
> @Pablo Estrada <pabl...@google.com> Any other suggestions for supporting
> CDC type pipelines?
>
> On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <catl...@yelp.com> wrote:
>
>> Thanks a lot for the response!
>>
>> We have several business use cases that rely strongly on ordering by
>> Kafka offset:
>> 1) streaming unwindowed inner join: say we want to join users with
>> reviews on user_id. Here are the schemas for two streams:
>>     user:
>>
>>    - user_id
>>    - name
>>    - timestamp
>>
>>     reviews:
>>
>>    - review_id
>>    - user_id
>>    - timestamp
>>
>> Here are the messages in each stream ordered by kafka offset:
>>     user:
>>     (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
>>     reviews:
>>     (ABC, 1, 90), (DEF, 2, 360)
>> I would expect to receive following output messages:
>>     (1, name_a, ABC) at timestamp 90
>>     (1, name_c, ABC) at timestamp 240
>>     (2, name_b, DEF) at timestamp 360
>> This can be done in native Flink since Flink kafka consumer reads from
>> each partition sequentially. But without an ordering guarantee, we can end
>> up with arbitrary results. So how would we implement this in Beam?
>> 2) unwindowed aggregation: aggregate all the employees for every
>> organization. Say we have a new employee stream with the following schema:
>>     new_employee:
>>
>>    - organization_id
>>    - employee_name
>>
>> And here are messaged ordered by kafka offset:
>> (1, name_a), (2, name_b), (2, name_c), (1, name_d)
>> I would expect the output to be:
>> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a, name_d])
>> Again without an ordering guarantee, the result is non deterministic.
>>
>> Change data capture (CDC) streams are a very common use case for our data
>> pipeline. As in the examples above we rely on Kafka offsets to make sure we
>> process data mutations in the proper order. While in some cases we have
>> Flink native solutions to these problems (Flink provides ordering
>> guarantees within the chosen key), we are now building some new Beam
>> applications that would require ordering guarantees. What is the
>> recommended approach in Beam for such use cases? If this isn’t currently
>> supported, do we have any near plan to add native ordering support in Beam?
>>
>>
>> On 2020/06/09 20:37:22, Luke Cwik <l...@google.com> wrote:
>> > This will likely break due to:>
>> > * workers can have more then one thread and hence process the source
>> in>
>> > parallel>
>> > * splitting a source allows for the source to be broken up into
>> multiple>
>> > restrictions and hence the runner can process those restrictions in
>> any>
>> > order they want. (lets say your kafka partition has unconsumed commit>
>> > offset range [20, 100), this could be split into [20, 60), [60, 100)
>> and>
>> > the [60, 100) offset range could be processed first)>
>> >
>> > You're right that you need to sort the output however you want within
>> your>
>> > DoFn before you make external calls to Kafka (this prevents you from
>> using>
>> > the KafkaIO sink implementation as a transform). There is an
>> annotation>
>> > @RequiresTimeSortedInput which is a special case for this sorting if
>> you>
>> > want it to be sorted by the elements timestamp but still you'll need
>> to>
>> > write to Kafka directly yourself from your DoFn.>
>> >
>> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:>
>> >
>> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our>
>> > > messages originate from a custom source that consumes messages from
>> a>
>> > > Kafka topic and emits them in the order of their Kafka offsets to a>
>> > > DoFn. After this DoFn processes the messages, they are emitted to a>
>> > > custom sink that sends messages to a Kafka topic.>
>> > >>
>> > > We want to process those messages in the order in which we receive>
>> > > them from Kafka and then emit them to the Kafka sink in the same>
>> > > order, but based on our understanding Beam does not provide an>
>> > > in-order transport. However, in practice we noticed that with a
>> Python>
>> > > SDK worker on Flink and a parallelism setting of 1 and one
>> sdk_worker>
>> > > instance, messages seem to be both processed and emitted in order.
>> Is>
>> > > that implementation-specific in-order behavior something that we can>
>> > > rely on, or is it very likely that this will break at some future>
>> > > point?>
>> > >>
>> > > In case it's not recommended to depend on that behavior what is the>
>> > > best approach for in-order processing?>
>> > >>
>> > >
>> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
>>
>> > > recommends to order events in a heap, but according to our>
>> > > understanding this approach will only work when directly writing to
>> an>
>> > > external system.>
>> > >>
>> >
>>
>

Reply via email to