Oscar Rodriguez created BEAM-10382:
--------------------------------------

             Summary: The Python SDK is not handling the sessions properly.
                 Key: BEAM-10382
                 URL: https://issues.apache.org/jira/browse/BEAM-10382
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.22.0, 2.20.0
         Environment: Direct Runner and Dataflow
            Reporter: Oscar Rodriguez


I tried this with Apache Beam 2.20 and 2.22

1) What I want to achieve:

I have a pipeline that is reading from Google Pub/Sub. The messages have user 
and product information. In the end, I need to analyse the data so I can know, 
for each user's session, how many products of each type there are.

2) What I did:

The first thing I do in my pipeline is a "Group by Key", using the user as a 
key and using "beam.WindowInto(beam.window.Sessions(15))" as windows. Then, as 
I need to aggregate over products for each user/session, I do another "Group by 
Key", this time with the product as key.

3) What I expect to happen:

With the first "Group by key", the pipeline creates a different window for each 
user/session combination. So, for the second "Group by key", I expect that it 
doesn't
mix elements that come from different windows.

4) What actually happens:

If the messages are at least 1 second apart from each other, the pipeline works 
as I expect.
However, if I publish all the messages at the same time, all the sessions and 
users get mixed.

Here https://github.com/Oscar-Rod/apache-beam-testing you have a complete 
working example.

To publish the messages I do the following:

 
{code:java}
def generate_message(user, products):
 return json.dumps({"user": user, "products": products,}).encode("utf-8")

messages = [
 generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
 generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
 generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
 generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
 generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
 generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
 generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
 generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
 generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
]
for message in messages:
 publisher.publish(topic_path, data=message)
 # time.sleep(1)
{code}
 

This will publish 9 messages. As the sessions are configured with a length of 
15 seconds, it should create one session for each user. In the end, the user 1 
should have 3 "prod_1", the user 2 should have 6 "prod_1" and the user 3 should 
have 9 "prod_1".

The first step in the pipeline is reading from Pub/Sub:

 
{code:java}
messages = (
 pipeline
 | "read messages" >> beam.io.ReadFromPubSub(topic=options.input_topic)
 | "parse to messages" >> beam.ParDo(ParseMessage())
 ){code}
 

It will parse the messages to the following Class:

 
{code:java}
class Product(BaseModel):
 id: str
 quantity: str

class Message(BaseModel):
 user: str
 products: List[Product]
 timestamp: datetime
{code}
 

Then, I apply the sessions and the Group By Key:

 
{code:java}
sessions = (
 messages
 | "window" >> beam.WindowInto(beam.window.Sessions(15))
 | "add key" >> beam.Map(lambda element: (element.user, element.products))
 | "group by user" >> beam.GroupByKey()
 ){code}
After this, I am getting the following elements:

 
{code:java}
('user_1', [[Product(id='prod_1', quantity='1')], [Product(id='prod_1', 
quantity='1')], [Product(id='prod_1', quantity='1')]])
('user_2', [[Product(id='prod_1', quantity='2')], [Product(id='prod_1', 
quantity='2')], [Product(id='prod_1', quantity='2')]])
('user_3', [[Product(id='prod_1', quantity='3')], [Product(id='prod_1', 
quantity='3')], [Product(id='prod_1', quantity='3')]])
{code}
 

To aggregate for each product, I need the product as a key, so I modified the 
previous step to flatten the elements:

 
{code:java}
def flat_function(key, elements):
 for element in elements:
 yield (key, element)
sessions = (
 messages
 | "window" >> beam.WindowInto(beam.window.Sessions(15))
 | "add key" >> beam.Map(lambda element: (element.user, element.products))
 | "group by user" >> beam.GroupByKey()
 | "first flatten" >> beam.FlatMapTuple(flat_function)
 | "second flatten" >> beam.FlatMapTuple(flat_function)
 )
{code}
 

And I am getting the following:

 
{code:java}
('user_1', Product(id='prod_1', quantity='1'))
('user_1', Product(id='prod_1', quantity='1'))
('user_1', Product(id='prod_1', quantity='1'))
('user_2', Product(id='prod_1', quantity='2'))
('user_2', Product(id='prod_1', quantity='2'))
('user_2', Product(id='prod_1', quantity='2'))
('user_3', Product(id='prod_1', quantity='3'))
('user_3', Product(id='prod_1', quantity='3'))
('user_3', Product(id='prod_1', quantity='3'))
{code}
 

Now, the last step:

 
{code:java}
products = (
 sessions
 | "add new key" >> beam.Map(lambda session: (session[1].id, (session[1], 
session[0])))
 | "group by product" >> beam.GroupByKey()
 ){code}
And here is where the issue happens. If the messages are published at least 1 
second apart, this is what I get:

 
{code:java}
('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), 
(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', 
quantity='1'), 'user_1')])
('prod_1', [(Product(id='prod_1', quantity='2'), 'user_2'), 
(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', 
quantity='2'), 'user_2')])
('prod_1', [(Product(id='prod_1', quantity='3'), 'user_3'), 
(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', 
quantity='3'), 'user_3')])
{code}
 

The result is what I expect, 3 elements, one per each user's session. And 
looking at the "quantity" we can confirm that the result is correct. All 
elements with "quantity=3" are in the same element, as they come from the same 
user/session. The same applies to the elements with "quantity=2" and 
"quantity=1".

However, if I publish the messages all at the same time, this is what I get:

 
{code:java}
('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), 
(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', 
quantity='1'), 'user_1'), (Product(id='prod_1', quantity='2'), 'user_2'), 
(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', 
quantity='2'), 'user_2'), (Product(id='prod_1', quantity='3'), 'user_3'), 
(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', 
quantity='3'), 'user_3')]){code}
Only 1 element, with all the messages in it. So clearly, when the timestamp of 
the messages is too close, Apache Beam can't put them in different sessions.

The fact the the behaviour of the pipeline changes when the timestamp of the 
messages changes, makes me think that this is a bug in Apache Beam. What do you 
think? Is it possible? Does anyone have an explanation as to why this happens? 
Can this somehow be expected behaviour?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to