Colin Le Nost created BEAM-9245: ----------------------------------- Summary: Unable to pull datatore Entity which contains dict properties Key: BEAM-9245 URL: https://issues.apache.org/jira/browse/BEAM-9245 Project: Beam Issue Type: Bug Components: sdk-py-core Affects Versions: 2.18.0 Reporter: Colin Le Nost
Hello, we are facing a small bug while reading Datastore entities using ReadFromDatastore transform (python SDK, 2.17 & 2.18) We are unable to retrieve entities that contain a dictionary. We think there is implicit casting from these properties into Datastore entity, but when the client is trying to retrieve the entity using the key, it breaks (because this entity has no key). h2. Stacktrace {code:python} File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py", line 269, in process yield types.Entity.from_client_entity(client_entity) File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 225, in from_client_entity value = Entity.from_client_entity(value) File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 219, in from_client_entity Key.from_client_key(client_entity.key), File ".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 156, in from_client_key return Key(client_key.flat_path, project=client_key.project, AttributeError: 'NoneType' object has no attribute 'flat_path' [while running 'Read from datastore/Read'] {code} h2. Here is some code to reproduce: # Insert a datastore entity using the given function # Run the dataflow pipeline using DirectRunner {code:python} import apache_beam as beam from google.cloud import datastore from apache_beam.io.gcp.datastore.v1new.types import Query from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions DATASTORE_KIND = "my_entity_kind" PROJECT_ID = "my_project_id" def create_datastore_entity(): client = datastore.Client(PROJECT_ID) key = client.key(DATASTORE_KIND, "my_task") entity = client.get(key=key) if entity is not None: raise Exception("Existing entity") else: entity_dict = {"regular_field": "test", "nested_field": {"field1": "my_field1"}} entity = datastore.Entity(key=key) entity_dict = {k: v for k, v in entity_dict.items()} entity.update(entity_dict) client.put(entity) def my_func(element): print(element) return element def run(): pipeline_options = PipelineOptions() pipeline_options.view_as(StandardOptions).runner = "DirectRunner" p = beam.Pipeline(options=pipeline_options) my_ds_query = Query(kind=DATASTORE_KIND, project=PROJECT_ID,) p | "Read from datastore" >> ReadFromDatastore( query=my_ds_query ) | "Print entity" >> beam.Map(my_func) p.run().wait_until_finish() if __name__ == "__main__": create_datastore_entity() run() {code} h2. Workaround Currently, we mocked the library using this code (modifying the Entity class, in `sdks/python/apache_beam/io/gcp/datastore/v1new/types.py`, aka this [line|[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/types.py#L231]].) {code:python} @staticmethod def from_client_entity(client_entity): res = Entity( Key.from_client_key(client_entity.key), exclude_from_indexes=set(client_entity.exclude_from_indexes)) for name, value in client_entity.items(): if isinstance(value, key.Key): value = Key.from_client_key(value) if isinstance(value, entity.Entity): if value.key: value = Entity.from_client_entity(value) else: value = {k:v for k,v in value.items()} res.properties[name] = value return res {code} If the workaround works for you, I can do the PR. Thanks, Colin -- This message was sent by Atlassian Jira (v8.3.4#803005)