vchunikhin commented on code in PR #23800:
URL: https://github.com/apache/beam/pull/23800#discussion_r1015325709
##########
playground/infrastructure/datastore_client.py:
##########
@@ -83,54 +87,67 @@ def save_to_cloud_datastore(self, examples_from_rep:
List[Example], sdk: Sdk, or
examples_ids_before_updating = self._get_all_examples(sdk, origin)
# loop through every example to save them to the Cloud Datastore
- with self._datastore_client.transaction():
- for example in tqdm(examples_from_rep):
- sdk_key = self._get_key(DatastoreProps.SDK_KIND,
Sdk.Name(example.sdk))
- example_id = self._make_example_id(origin, sdk, example.name)
- updated_example_ids.append(example_id)
- examples.append(
- self._to_example_entity(example, example_id, sdk_key,
actual_schema_version_key, origin)
- )
- snippets.append(
- self._to_snippet_entity(example, example_id, sdk_key, now,
actual_schema_version_key, origin)
- )
- pc_objects.extend(self._pc_object_entities(example,
example_id))
- files.append(self._to_file_entity(example, example_id))
-
- self._datastore_client.put_multi(examples)
- self._datastore_client.put_multi(snippets)
- self._datastore_client.put_multi(pc_objects)
- self._datastore_client.put_multi(files)
-
- # delete examples from the Cloud Datastore that are not in the
repository
- examples_ids_for_removing = list(filter(lambda key: key not in
updated_example_ids, examples_ids_before_updating))
- if len(examples_ids_for_removing) != 0:
- logging.info("Start of deleting extra playground examples ...")
- examples_keys_for_removing = list(
- map(lambda ex_id:
self._get_key(DatastoreProps.EXAMPLE_KIND, ex_id), examples_ids_for_removing)
- )
- snippets_keys_for_removing = list(
- map(lambda ex_id:
self._get_key(DatastoreProps.SNIPPET_KIND, ex_id), examples_ids_for_removing)
- )
- file_keys_for_removing = list(
- map(self._get_files_key, examples_ids_for_removing)
+
+ for example in tqdm(examples_from_rep):
+ sdk_key = self._get_key(DatastoreProps.SDK_KIND,
Sdk.Name(example.sdk))
+ example_id = self._make_example_id(origin, sdk, example.name)
+ updated_example_ids.append(example_id)
+ examples.append(
+ self._to_example_entity(example, example_id, sdk_key,
actual_schema_version_key, origin)
+ )
+ snippet = self._to_snippet_entity(example, example_id, sdk_key,
now, actual_schema_version_key, origin)
+ pc_objects.extend(self._pc_object_entities(example, example_id))
+ files.append(self._to_file_entity(example, example_id))
+ if example.datasets and example.emulators:
+ dataset = example.datasets[0]
+ emulator = example.emulators[0]
+ file_name = f"{dataset.name}.{dataset.format}"
+ link = self._upload_dataset_to_bucket(file_name)
+ dataset = self._to_dataset_entity(file_name, link)
+ datasets.append(dataset)
+ dataset_nested_entity =
self._to_dataset_nested_entity(file_name, example_id, emulator)
+ snippet_datasets = [dataset_nested_entity]
+ snippet.update(
+ {
+ "datasets": snippet_datasets
+ }
)
- pc_objs_keys_for_removing = []
- for example_id_item in examples_ids_for_removing:
- for example_type in [
- PrecompiledExample.GRAPH_EXTENSION.upper(),
- PrecompiledExample.OUTPUT_EXTENSION.upper(),
- PrecompiledExample.LOG_EXTENSION.upper()
- ]:
- pc_objs_keys_for_removing.append(
-
self._get_key(DatastoreProps.PRECOMPILED_OBJECT_KIND,
-
f"{example_id_item}{config.DatastoreProps.KEY_NAME_DELIMITER}{example_type}")
- )
- self._datastore_client.delete_multi(examples_keys_for_removing)
- self._datastore_client.delete_multi(snippets_keys_for_removing)
- self._datastore_client.delete_multi(file_keys_for_removing)
- self._datastore_client.delete_multi(pc_objs_keys_for_removing)
- logging.info("Finish of deleting extra playground examples
...")
+ snippets.append(snippet)
+
+ if datasets:
+ self._datastore_client.put_multi(datasets)
+ self._datastore_client.put_multi(examples)
+ self._datastore_client.put_multi(snippets)
+ self._datastore_client.put_multi(pc_objects)
+ self._datastore_client.put_multi(files)
+
+ # delete examples from the Cloud Datastore that are not in the
repository
+ examples_ids_for_removing = list(filter(lambda key: key not in
updated_example_ids, examples_ids_before_updating))
+ if len(examples_ids_for_removing) != 0:
+ logging.info("Start of deleting extra playground examples ...")
+ examples_keys_for_removing = list(
+ map(lambda ex_id: self._get_key(DatastoreProps.EXAMPLE_KIND,
ex_id), examples_ids_for_removing)
+ )
+ snippets_keys_for_removing = list(
+ map(lambda ex_id: self._get_key(DatastoreProps.SNIPPET_KIND,
ex_id), examples_ids_for_removing)
+ )
+ file_keys_for_removing = list(
+ map(self._get_files_key, examples_ids_for_removing)
+ )
+ pc_objs_keys_for_removing = []
+ for example_id_item in examples_ids_for_removing:
+ for example_type in [
+ PrecompiledExample.GRAPH_EXTENSION.upper(),
+ PrecompiledExample.OUTPUT_EXTENSION.upper(),
+ PrecompiledExample.LOG_EXTENSION.upper()
+ ]:
+ pc_objs_keys_for_removing.append(
+ self._get_key(DatastoreProps.PRECOMPILED_OBJECT_KIND,
f"{example_id_item}{config.DatastoreProps.KEY_NAME_DELIMITER}{example_type}"))
+ self._datastore_client.delete_multi(examples_keys_for_removing)
+ self._datastore_client.delete_multi(snippets_keys_for_removing)
+ self._datastore_client.delete_multi(file_keys_for_removing)
+ self._datastore_client.delete_multi(pc_objs_keys_for_removing)
+ logging.info("Finish of deleting extra playground examples ...")
Review Comment:
I'd create a separate task for that. It looks that we are not able to delete
unused datasets here because python scripts are executed for every SDK
separately but a dataset can relate to an example from other SDK. Thus, we can
compare links that store in GCS and links in _pg_datasets_ kind by a scheduled
task in golang to remove them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]