Colin Le Nost created BEAM-9245:
-----------------------------------

             Summary: Unable to pull datatore Entity which contains dict 
properties
                 Key: BEAM-9245
                 URL: https://issues.apache.org/jira/browse/BEAM-9245
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.18.0
            Reporter: Colin Le Nost


Hello, we are facing a small bug while reading Datastore entities using 
ReadFromDatastore transform (python SDK, 2.17 & 2.18)

We are unable to retrieve entities that contain a dictionary. We think there is 
implicit casting from these properties into Datastore entity, but when the 
client is trying to retrieve the entity using the key, it breaks (because this 
entity has no key).
h2.  Stacktrace
{code:python}
  File 
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py",
 line 269, in process
    yield types.Entity.from_client_entity(client_entity)
  File 
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
 line 225, in from_client_entity
    value = Entity.from_client_entity(value)
  File 
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
 line 219, in from_client_entity
    Key.from_client_key(client_entity.key),
  File 
".../venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py",
 line 156, in from_client_key
    return Key(client_key.flat_path, project=client_key.project,
AttributeError: 'NoneType' object has no attribute 'flat_path' [while running 
'Read from datastore/Read']
{code}
 
h2.  Here is some code to reproduce:
 # Insert a datastore entity using the given function
 # Run the dataflow pipeline using DirectRunner

 
{code:python}
import apache_beam as beam
from google.cloud import datastore
from apache_beam.io.gcp.datastore.v1new.types import Query
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import StandardOptions, 
PipelineOptions

DATASTORE_KIND = "my_entity_kind"
PROJECT_ID = "my_project_id"


def create_datastore_entity():
    client = datastore.Client(PROJECT_ID)

    key = client.key(DATASTORE_KIND, "my_task")

    entity = client.get(key=key)
    if entity is not None:
        raise Exception("Existing entity")
    else:
        entity_dict = {"regular_field": "test", "nested_field": {"field1": 
"my_field1"}}
        entity = datastore.Entity(key=key)
    entity_dict = {k: v for k, v in entity_dict.items()}
    entity.update(entity_dict)
    client.put(entity)


def my_func(element):
    print(element)
    return element


def run():
    pipeline_options = PipelineOptions()

    pipeline_options.view_as(StandardOptions).runner = "DirectRunner"
    p = beam.Pipeline(options=pipeline_options)
    my_ds_query = Query(kind=DATASTORE_KIND, project=PROJECT_ID,)
    p | "Read from datastore" >> ReadFromDatastore(
        query=my_ds_query
    ) | "Print entity" >> beam.Map(my_func)
    p.run().wait_until_finish()


if __name__ == "__main__":
    create_datastore_entity()
    run()
{code}
h2.  
 Workaround

Currently, we mocked the library using this code (modifying the Entity class, 
in `sdks/python/apache_beam/io/gcp/datastore/v1new/types.py`, aka this 
[line|[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1new/types.py#L231]].)
 
{code:python}
  @staticmethod
  def from_client_entity(client_entity):
    res = Entity(
        Key.from_client_key(client_entity.key),
        exclude_from_indexes=set(client_entity.exclude_from_indexes))
    for name, value in client_entity.items():
      if isinstance(value, key.Key):
        value = Key.from_client_key(value)
      if isinstance(value, entity.Entity):
        if value.key:
          value = Entity.from_client_entity(value)
        else:
          value = {k:v for k,v in value.items()}
      res.properties[name] = value
    return res
{code}
 If the workaround works for you, I can do the PR.

 

Thanks, Colin

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to