[ https://issues.apache.org/jira/browse/BEAM-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490480#comment-17490480 ]
Robert Bradshaw commented on BEAM-12552: ---------------------------------------- More recent SDKs reject this with ``` ValueError: MapCoder[StrUtf8Coder, MapCoder[StrUtf8Coder, VarIntCoder]] cannot be made deterministic for 'Group By Key'. ``` > GroupBy and GroupByKey doesn't group composite keys > --------------------------------------------------- > > Key: BEAM-12552 > URL: https://issues.apache.org/jira/browse/BEAM-12552 > Project: Beam > Issue Type: Bug > Components: beam-model, sdk-py-core > Affects Versions: 2.24.0 > Environment: 1. MacOS BigSur 11.4, Python 3.9 > 2. Google DataFlow, Python 3.9 > Reporter: Praneeth Peiris > Priority: P3 > Labels: newbie, starter > > *Background* > Even though GroupByKey and GroupBy work for simple key fields, it doesn't > work for composite keys where the key is another dictionary. > > *Example:* > {code:java} > import apache_beam as beam > def run(consumer_args, pipeline_args): > with beam.Pipeline() as pipeline: > pipeline | 'Create Dataset' >> beam.Create([ > ({'key': {'image_id': 1, 'context_id': 2}}, 1), > ({'key': {'image_id': 2, 'context_id': 2}}, 2), > ({'key': {'image_id': 3, 'context_id': 2}}, 3), > ({'key': {'image_id': 4, 'context_id': 2}}, 4), > ({'key': {'image_id': 5, 'context_id': 2}}, 5), > ({'key': {'context_id': 2, 'image_id': 1}}, 6), > ({'key': {'context_id': 2, 'image_id': 2}}, 7), > ({'key': {'context_id': 2, 'image_id': 3}}, 8), > ({'key': {'context_id': 2, 'image_id': 4}}, 9), > ({'key': {'context_id': 2, 'image_id': 5}}, 10), > ]) | 'Group By Key' >> beam.GroupByKey() \ > | 'Print Stuff' >> beam.Map(print) > pipeline.run() > {code} > Note there are records with the same keys (semantically) but in a different > order. > The same issue occurs for {{GroupBy}} when we use a mapper function. > {code:java} > | 'Group By Key' >> beam.GroupByKey(lambda record: record[0]['key']) > {code} > > *Expected output:* > {code:java} > ({'key': {'image_id': 1, 'context_id': 2}}, [1, 6]) > ({'key': {'image_id': 2, 'context_id': 2}}, [2, 7]) > ({'key': {'image_id': 3, 'context_id': 2}}, [3, 8]) > ({'key': {'image_id': 4, 'context_id': 2}}, [4, 9]) > ({'key': {'image_id': 5, 'context_id': 2}}, [5, 10]){code} > > *Actual output:* > {code:java} > ({'key': {'image_id': 1, 'context_id': 2}}, [1]) > ({'key': {'image_id': 2, 'context_id': 2}}, [2]) > ({'key': {'image_id': 3, 'context_id': 2}}, [3]) > ({'key': {'image_id': 4, 'context_id': 2}}, [4]) > ({'key': {'image_id': 5, 'context_id': 2}}, [5]) > ({'key': {'context_id': 2, 'image_id': 1}}, [6]) > ({'key': {'context_id': 2, 'image_id': 2}}, [7]) > ({'key': {'context_id': 2, 'image_id': 3}}, [8]) > ({'key': {'context_id': 2, 'image_id': 4}}, [9]) > ({'key': {'context_id': 2, 'image_id': 5}}, [10]) > {code} > *Workaround:* > {code:java} > def _generate_key(key_object): > # Generate an Ordered String Key only for the GroupBy > fields = list(key_object.keys()) > fields.sort() > return ",".join([f"{field}:{key_object[field]}" for field in fields]) > ... > ...| "Group" >> beam.GroupBy(lambda record: _generate_key(record['key'])) > ... > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)