[ https://issues.apache.org/jira/browse/BEAM-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17301926#comment-17301926 ]
Robert Bradshaw commented on BEAM-10382: ---------------------------------------- In Java IIRC we just prohibit the second GBK. Perhaps we should do the same in Python. Python does not have (built-in) IdentityWindows (though those might not even provide the right thing, as identical windows would still merge). I think the issue here is that the second GBK should be keyed by both user and product, if it's desirable that uses of the product by different users be kept distinct. It may be however that a second GBK is not even needed--after grouping by user one can then count the uses of each project locally rather than introducing a second GBK. > The Python SDK is not handling the sessions properly. > ----------------------------------------------------- > > Key: BEAM-10382 > URL: https://issues.apache.org/jira/browse/BEAM-10382 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Affects Versions: 2.20.0, 2.22.0 > Environment: Direct Runner and Dataflow > Reporter: Oscar Rodriguez > Priority: P1 > Labels: session, windowing > > I tried this with Apache Beam 2.20 and 2.22 > 1) What I want to achieve: > I have a pipeline that is reading from Google Pub/Sub. The messages have user > and product information. In the end, I need to analyse the data so I can > know, for each user's session, how many products of each type there are. > 2) What I did: > The first thing I do in my pipeline is a "Group by Key", using the user as a > key and using "beam.WindowInto(beam.window.Sessions(15))" as windows. Then, > as I need to aggregate over products for each user/session, I do another > "Group by Key", this time with the product as key. > 3) What I expect to happen: > With the first "Group by key", the pipeline creates a different window for > each user/session combination. So, for the second "Group by key", I expect > that it doesn't > mix elements that come from different windows. > 4) What actually happens: > If the messages are at least 1 second apart from each other, the pipeline > works as I expect. > However, if I publish all the messages at the same time, all the sessions and > users get mixed. > Here https://github.com/Oscar-Rod/apache-beam-testing you have a complete > working example. > To publish the messages I do the following: > > {code:java} > def generate_message(user, products): > return json.dumps({"user": user, "products": products,}).encode("utf-8") > messages = [ > generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]), > generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]), > generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]), > generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]), > generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]), > generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]), > generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]), > generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]), > generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]), > ] > for message in messages: > publisher.publish(topic_path, data=message) > # time.sleep(1) > {code} > > This will publish 9 messages. As the sessions are configured with a length of > 15 seconds, it should create one session for each user. In the end, the user > 1 should have 3 "prod_1", the user 2 should have 6 "prod_1" and the user 3 > should have 9 "prod_1". > The first step in the pipeline is reading from Pub/Sub: > > {code:java} > messages = ( > pipeline > | "read messages" >> beam.io.ReadFromPubSub(topic=options.input_topic) > | "parse to messages" >> beam.ParDo(ParseMessage()) > ){code} > > It will parse the messages to the following Class: > > {code:java} > class Product(BaseModel): > id: str > quantity: str > class Message(BaseModel): > user: str > products: List[Product] > timestamp: datetime > {code} > > Then, I apply the sessions and the Group By Key: > > {code:java} > sessions = ( > messages > | "window" >> beam.WindowInto(beam.window.Sessions(15)) > | "add key" >> beam.Map(lambda element: (element.user, element.products)) > | "group by user" >> beam.GroupByKey() > ){code} > After this, I am getting the following elements: > > {code:java} > ('user_1', [[Product(id='prod_1', quantity='1')], [Product(id='prod_1', > quantity='1')], [Product(id='prod_1', quantity='1')]]) > ('user_2', [[Product(id='prod_1', quantity='2')], [Product(id='prod_1', > quantity='2')], [Product(id='prod_1', quantity='2')]]) > ('user_3', [[Product(id='prod_1', quantity='3')], [Product(id='prod_1', > quantity='3')], [Product(id='prod_1', quantity='3')]]) > {code} > > To aggregate for each product, I need the product as a key, so I modified the > previous step to flatten the elements: > > {code:java} > def flat_function(key, elements): > for element in elements: > yield (key, element) > sessions = ( > messages > | "window" >> beam.WindowInto(beam.window.Sessions(15)) > | "add key" >> beam.Map(lambda element: (element.user, element.products)) > | "group by user" >> beam.GroupByKey() > | "first flatten" >> beam.FlatMapTuple(flat_function) > | "second flatten" >> beam.FlatMapTuple(flat_function) > ) > {code} > > And I am getting the following: > > {code:java} > ('user_1', Product(id='prod_1', quantity='1')) > ('user_1', Product(id='prod_1', quantity='1')) > ('user_1', Product(id='prod_1', quantity='1')) > ('user_2', Product(id='prod_1', quantity='2')) > ('user_2', Product(id='prod_1', quantity='2')) > ('user_2', Product(id='prod_1', quantity='2')) > ('user_3', Product(id='prod_1', quantity='3')) > ('user_3', Product(id='prod_1', quantity='3')) > ('user_3', Product(id='prod_1', quantity='3')) > {code} > > Now, the last step: > > {code:java} > products = ( > sessions > | "add new key" >> beam.Map(lambda session: (session[1].id, (session[1], > session[0]))) > | "group by product" >> beam.GroupByKey() > ){code} > And here is where the issue happens. If the messages are published at least 1 > second apart, this is what I get: > > {code:java} > ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), > (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', > quantity='1'), 'user_1')]) > ('prod_1', [(Product(id='prod_1', quantity='2'), 'user_2'), > (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', > quantity='2'), 'user_2')]) > ('prod_1', [(Product(id='prod_1', quantity='3'), 'user_3'), > (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', > quantity='3'), 'user_3')]) > {code} > > The result is what I expect, 3 elements, one per each user's session. And > looking at the "quantity" we can confirm that the result is correct. All > elements with "quantity=3" are in the same element, as they come from the > same user/session. The same applies to the elements with "quantity=2" and > "quantity=1". > However, if I publish the messages all at the same time, this is what I get: > > {code:java} > ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), > (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', > quantity='1'), 'user_1'), (Product(id='prod_1', quantity='2'), 'user_2'), > (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', > quantity='2'), 'user_2'), (Product(id='prod_1', quantity='3'), 'user_3'), > (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', > quantity='3'), 'user_3')]){code} > Only 1 element, with all the messages in it. So clearly, when the timestamp > of the messages is too close, Apache Beam can't put them in different > sessions. > The fact the the behaviour of the pipeline changes when the timestamp of the > messages changes, makes me think that this is a bug in Apache Beam. What do > you think? Is it possible? Does anyone have an explanation as to why this > happens? Can this somehow be expected behaviour? -- This message was sent by Atlassian Jira (v8.3.4#803005)