Thanks a lot for the response!

We have several business use cases that rely strongly on ordering by Kafka 
offset:
1) streaming unwindowed inner join: say we want to join users with reviews on 
user_id. Here are the schemas for two streams:
    user:
user_id
name
timestamp
    reviews:
review_id
user_id
timestamp
Here are the messages in each stream ordered by kafka offset:
    user:
    (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
    reviews:
    (ABC, 1, 90), (DEF, 2, 360)
I would expect to receive following output messages:
    (1, name_a, ABC) at timestamp 90
    (1, name_c, ABC) at timestamp 240
    (2, name_b, DEF) at timestamp 360
This can be done in native Flink since Flink kafka consumer reads from each 
partition sequentially. But without an ordering guarantee, we can end up with 
arbitrary results. So how would we implement this in Beam?
2) unwindowed aggregation: aggregate all the employees for every organization. 
Say we have a new employee stream with the following schema:
    new_employee:
organization_id
employee_name
And here are messaged ordered by kafka offset:
(1, name_a), (2, name_b), (2, name_c), (1, name_d)
I would expect the output to be:
(1, [name_a]), (2, [name_b]), (2, [name_b, name_c]), (1, [name_a, name_d])
Again without an ordering guarantee, the result is non deterministic. 

Change data capture (CDC) streams are a very common use case for our data 
pipeline. As in the examples above we rely on Kafka offsets to make sure we 
process data mutations in the proper order. While in some cases we have Flink 
native solutions to these problems (Flink provides ordering guarantees within 
the chosen key), we are now building some new Beam applications that would 
require ordering guarantees. What is the recommended approach in Beam for such 
use cases? If this isn’t currently supported, do we have any near plan to add 
native ordering support in Beam?


On 2020/06/09 20:37:22, Luke Cwik <l...@google.com> wrote: 
> This will likely break due to:> 
> * workers can have more then one thread and hence process the source in> 
> parallel> 
> * splitting a source allows for the source to be broken up into multiple> 
> restrictions and hence the runner can process those restrictions in any> 
> order they want. (lets say your kafka partition has unconsumed commit> 
> offset range [20, 100), this could be split into [20, 60), [60, 100) and> 
> the [60, 100) offset range could be processed first)> 
> 
> You're right that you need to sort the output however you want within your> 
> DoFn before you make external calls to Kafka (this prevents you from using> 
> the KafkaIO sink implementation as a transform). There is an annotation> 
> @RequiresTimeSortedInput which is a special case for this sorting if you> 
> want it to be sorted by the elements timestamp but still you'll need to> 
> write to Kafka directly yourself from your DoFn.> 
> 
> On Mon, Jun 8, 2020 at 4:24 PM Hadi Zhang <ha...@yelp.com> wrote:> 
> 
> > We are using the Beam 2.20 Python SDK on a Flink 1.9 runner. Our> 
> > messages originate from a custom source that consumes messages from a> 
> > Kafka topic and emits them in the order of their Kafka offsets to a> 
> > DoFn. After this DoFn processes the messages, they are emitted to a> 
> > custom sink that sends messages to a Kafka topic.> 
> >> 
> > We want to process those messages in the order in which we receive> 
> > them from Kafka and then emit them to the Kafka sink in the same> 
> > order, but based on our understanding Beam does not provide an> 
> > in-order transport. However, in practice we noticed that with a Python> 
> > SDK worker on Flink and a parallelism setting of 1 and one sdk_worker> 
> > instance, messages seem to be both processed and emitted in order. Is> 
> > that implementation-specific in-order behavior something that we can> 
> > rely on, or is it very likely that this will break at some future> 
> > point?> 
> >> 
> > In case it's not recommended to depend on that behavior what is the> 
> > best approach for in-order processing?> 
> >> 
> > https://stackoverflow.com/questions/45888719/processing-total-ordering-of-events-by-key-using-apache-beam>
> >  
> > recommends to order events in a heap, but according to our> 
> > understanding this approach will only work when directly writing to an> 
> > external system.> 
> >> 
> 

Reply via email to