I don't know how well RequiresTimeSortedInput will work for any late data. I think you will want to include the Kafka offset in your records (unless the records have their own sequence number) and then use state to buffer and sort. There is a proposal (and work in progress) for a sorted state API, which will make this easier and more efficient.
Reuven On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik <lc...@google.com> wrote: > For runners that support @RequiresTimeSortedInput, all your input will > come time sorted (as long as your element's timestamp tracks the order that > you want). > For runners that don't support this, you need to build a StatefulDoFn that > buffers out of order events and reorders them to the order that you need. > > @Pablo Estrada <pabl...@google.com> Any other suggestions for supporting > CDC type pipelines? > > On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong <catl...@yelp.com> wrote: > >> Thanks a lot for the response! >> >> We have several business use cases that rely strongly on ordering by >> Kafka offset: >> 1) streaming unwindowed inner join: say we want to join users with >> reviews on user_id. Here are the schemas for two streams: >> user: >> >> - user_id >> - name >> - timestamp >> >> reviews: >> >> - review_id >> - user_id >> - timestamp >> >> Here are the messages in each stream ordered by kafka offset: >> user: >> (1, name_a, 60), (2, name_b, 120), (1, name_c, 240) >> reviews: >> (ABC, 1, 90), (DEF, 2, 360) >> I would expect to receive following output messages: >> (1, name_a, ABC) at timestamp 90 >> (1, name_c, ABC) at timestamp 240 >> (2, name_b, DEF) at timestamp 360 >> This can be done in native Flink since Flink kafka consumer reads from >> each partition sequentially. But without an ordering guarantee, we can end >> up with arbitrary results. So how would we implement this in Beam? >> 2) unwindowed aggregation: aggregate all the employees for every >> organization. Say we have a new employee stream with the following schema: >> new_employee: >> >> - organization_id >> - employee_name >> >> And here are messaged ordered by kafka offset: >> (1, name_a), (2, name_b), (2, name_c), (1, name_d) >> I would expect the output to be: >> (1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a, name_d]) >> Again without an ordering guarantee, the result is non deterministic. >> >> Change data capture (CDC) streams are a very common use case for our data >> pipeline. As in the examples above we rely on Kafka offsets to make sure we >> process data mutations in the proper order. While in some cases we have >> Flink native solutions to these problems (Flink provides ordering >> guarantees within the chosen key), we are now building some new Beam >> applications that would require ordering guarantees. What is the >> recommended approach in Beam for such use cases? If this isn’t currently >> supported, do we have any near plan to add native ordering support in Beam? >> >> >> On 2020/06/09 20:37:22, Luke Cwik <l...@google.com> wrote: >> > This will likely break due to:> >> > * workers can have more then one thread and hence process the source >> in> >> > parallel> >> > * splitting a source allows for the source to be broken up into >> multiple> >> > restrictions and hence the runner can process those restrictions in >> any> >> > order they want. (lets say your kafka partition has unconsumed commit> >> > offset range [20, 100), this could be split into [20, 60), [60, 100) >> and> >> > the [60, 100) offset range could be processed first)> >> > >> > You're right that you need to sort the output however you want within >> your> >> > DoFn before you make external calls to Kafka (this prevents you from >> using> >> > the KafkaIO sink implementation as a transform). There is an >> annotation> >> > @RequiresTimeSortedInput which is a special case for this sorting if >> you> >> > want it to be sorted by the elements timestamp but still you'll need >> to> >> > write to Kafka directly yourself from your DoFn.> >> > >> > On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:> >> > >> > > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our> >> > > messages originate from a custom source that consumes messages from >> a> >> > > Kafka topic and emits them in the order of their Kafka offsets to a> >> > > DoFn. After this DoFn processes the messages, they are emitted to a> >> > > custom sink that sends messages to a Kafka topic.> >> > >> >> > > We want to process those messages in the order in which we receive> >> > > them from Kafka and then emit them to the Kafka sink in the same> >> > > order, but based on our understanding Beam does not provide an> >> > > in-order transport. However, in practice we noticed that with a >> Python> >> > > SDK worker on Flink and a parallelism setting of 1 and one >> sdk_worker> >> > > instance, messages seem to be both processed and emitted in order. >> Is> >> > > that implementation-specific in-order behavior something that we can> >> > > rely on, or is it very likely that this will break at some future> >> > > point?> >> > >> >> > > In case it's not recommended to depend on that behavior what is the> >> > > best approach for in-order processing?> >> > >> >> > > >> https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam> >> >> > > recommends to order events in a heap, but according to our> >> > > understanding this approach will only work when directly writing to >> an> >> > > external system.> >> > >> >> > >> >