[ 
https://issues.apache.org/jira/browse/BEAM-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490480#comment-17490480
 ] 

Robert Bradshaw commented on BEAM-12552:
----------------------------------------

More recent SDKs reject this with

```
ValueError: MapCoder[StrUtf8Coder, MapCoder[StrUtf8Coder, VarIntCoder]] cannot 
be made deterministic for 'Group By Key'.
```

> GroupBy and GroupByKey doesn't group composite keys
> ---------------------------------------------------
>
>                 Key: BEAM-12552
>                 URL: https://issues.apache.org/jira/browse/BEAM-12552
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model, sdk-py-core
>    Affects Versions: 2.24.0
>         Environment: 1. MacOS BigSur 11.4, Python 3.9
> 2. Google DataFlow, Python 3.9
>            Reporter: Praneeth Peiris
>            Priority: P3
>              Labels: newbie, starter
>
> *Background*
> Even though GroupByKey and GroupBy work for simple key fields, it doesn't 
> work for composite keys where the key is another dictionary.
>  
> *Example:*
> {code:java}
> import apache_beam as beam
> def run(consumer_args, pipeline_args):
>     with beam.Pipeline() as pipeline:
>         pipeline | 'Create Dataset' >> beam.Create([
>             ({'key': {'image_id': 1, 'context_id': 2}}, 1),
>             ({'key': {'image_id': 2, 'context_id': 2}}, 2),
>             ({'key': {'image_id': 3, 'context_id': 2}}, 3),
>             ({'key': {'image_id': 4, 'context_id': 2}}, 4),
>             ({'key': {'image_id': 5, 'context_id': 2}}, 5),
>             ({'key': {'context_id': 2, 'image_id': 1}}, 6),
>             ({'key': {'context_id': 2, 'image_id': 2}}, 7),
>             ({'key': {'context_id': 2, 'image_id': 3}}, 8),
>             ({'key': {'context_id': 2, 'image_id': 4}}, 9),
>             ({'key': {'context_id': 2, 'image_id': 5}}, 10),
>         ]) | 'Group By Key' >> beam.GroupByKey() \
>            | 'Print Stuff' >> beam.Map(print)
>         pipeline.run()
> {code}
> Note there are records with the same keys (semantically) but in a different 
> order.
> The same issue occurs for {{GroupBy}} when we use a mapper function.
> {code:java}
> | 'Group By Key' >> beam.GroupByKey(lambda record: record[0]['key'])
> {code}
>  
> *Expected output:*
> {code:java}
> ({'key': {'image_id': 1, 'context_id': 2}}, [1, 6])
> ({'key': {'image_id': 2, 'context_id': 2}}, [2, 7])
> ({'key': {'image_id': 3, 'context_id': 2}}, [3, 8])
> ({'key': {'image_id': 4, 'context_id': 2}}, [4, 9])
> ({'key': {'image_id': 5, 'context_id': 2}}, [5, 10]){code}
>  
> *Actual output:*
> {code:java}
> ({'key': {'image_id': 1, 'context_id': 2}}, [1])
> ({'key': {'image_id': 2, 'context_id': 2}}, [2])
> ({'key': {'image_id': 3, 'context_id': 2}}, [3])
> ({'key': {'image_id': 4, 'context_id': 2}}, [4])
> ({'key': {'image_id': 5, 'context_id': 2}}, [5])
> ({'key': {'context_id': 2, 'image_id': 1}}, [6])
> ({'key': {'context_id': 2, 'image_id': 2}}, [7])
> ({'key': {'context_id': 2, 'image_id': 3}}, [8])
> ({'key': {'context_id': 2, 'image_id': 4}}, [9])
> ({'key': {'context_id': 2, 'image_id': 5}}, [10])
> {code}
> *Workaround:*
> {code:java}
> def _generate_key(key_object):
>     # Generate an Ordered String Key only for the GroupBy
>     fields = list(key_object.keys())
>     fields.sort()
>     return ",".join([f"{field}:{key_object[field]}" for field in fields])
> ...
> ...| "Group" >> beam.GroupBy(lambda record: _generate_key(record['key']))
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to