[ https://issues.apache.org/jira/browse/BEAM-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yifan zou resolved BEAM-7608. ----------------------------- Resolution: Fixed > v1new ReadFromDatastore skips entities > -------------------------------------- > > Key: BEAM-7608 > URL: https://issues.apache.org/jira/browse/BEAM-7608 > Project: Beam > Issue Type: Bug > Components: io-py-gcp > Affects Versions: 2.13.0 > Environment: MacOS 10.14.5, Python 2.7 > Reporter: Jacob Gur > Assignee: Udi Meiri > Priority: Blocker > Fix For: 2.15.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > A simple map over a datastore kind in local emulator using the new > v1new.datastoreio.ReadFromDatastore skip entities. > The kind has 1516 entities, and when I map over it using the old > ReadFromDatastore transform, it maps all of them, i.e., I can map to id and > write to text file. > But the new transform only maps 365 entities. There is no error. The tail of > the standard output is: > {code:java} > INFO:root:Latest stats timestamp for kind face_apilog is 2019-06-18 > 08:15:21+00:00 > INFO:root:Estimated size bytes for query: 116188 > INFO:root:Splitting the query into 12 splits > INFO:root:Running > (((GetEntities/Reshuffle/ReshufflePerKey/GroupByKey/Read)(ref_AppliedPTransform_GetEntities/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))((ref_AppliedPTransform_GetEntities/Reshuffle/RemoveRandomKeys_15)(ref_AppliedPTransform_GetEntities/Read_16)))((ref_AppliedPTransform_MapToId_17)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WriteBundles_24)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Pair_25)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/WindowInto(WindowIntoFn)_26)(WriteToFile/Write/WriteImpl/GroupByKey/Write))))) > INFO:root:Running > (WriteToFile/Write/WriteImpl/GroupByKey/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/Extract_31)(ref_PCollection_PCollection_20/Write)) > INFO:root:Running > (ref_PCollection_PCollection_12/Read)((ref_AppliedPTransform_WriteToFile/Write/WriteImpl/PreFinalize_32)(ref_PCollection_PCollection_21/Write)) > INFO:root:Running > (ref_PCollection_PCollection_12/Read)+(ref_AppliedPTransform_WriteToFile/Write/WriteImpl/FinalizeWrite_33) > INFO:root:Starting finalize_write threads with num_shards: 1 (skipped: 0), > batches: 1, num_threads: 1 > INFO:root:Renamed 1 shards in 0.12 seconds.{code} > > The code for the job on the new transform is: > > > {code:java} > from __future__ import absolute_import > import logging > import os > import sys > import apache_beam as beam > from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore > from apache_beam.io.gcp.datastore.v1new.types import Query > # TODO: should be set outside of python process > os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085' > def map_to_id(element): > face_log_id = element.to_client_entity().id > return face_log_id > def run(argv=None): > p = beam.Pipeline(argv=argv) > project = 'dev' > (p > | 'GetEntities' >> ReadFromDatastore(Query(kind='face_apilog', > project=project)) > | 'MapToId' >> beam.Map(map_to_id) > | 'WriteToFile' >> beam.io.WriteToText('result') > ) > p.run().wait_until_finish() > if __name__ == '__main__': > logging.getLogger().setLevel(logging.INFO) > run(sys.argv){code} > > For comparison, the code for the job on the old transform is: > > {code:java} > from __future__ import absolute_import > import logging > import os > import sys > import apache_beam as beam > from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore > from google.cloud.proto.datastore.v1 import query_pb2 > # TODO: should be set outside of python process > os.environ['DATASTORE_EMULATOR_HOST'] = 'localhost:8085' > def map_to_id(element): > face_log_id = element.key.path[-1].id > return face_log_id > def run(argv=None): > p = beam.Pipeline(argv=argv) > project = 'dev' > query = query_pb2.Query() > query.kind.add().name = 'face_apilog' > (p > | 'GetEntities' >> ReadFromDatastore(project=project, query=query) > # TODO: ParDo??? > | 'MapToId' >> beam.Map(map_to_id) > | 'WriteToFile' >> beam.io.WriteToText('result') > ) > p.run().wait_until_finish() > if __name__ == '__main__': > logging.getLogger().setLevel(logging.INFO) > run(sys.argv){code} > -- This message was sent by Atlassian JIRA (v7.6.14#76016)