y1chi commented on code in PR #17323:
URL: https://github.com/apache/beam/pull/17323#discussion_r848966712
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -393,6 +393,71 @@ def _adjust_pipeline_for_dataflow_v2(self, pipeline):
def _check_for_unsupported_features_on_non_portable_worker(self, pipeline):
pipeline.visit(self.combinefn_visitor())
+ def _enable_sdk_prebuild_if_applicable(self, options):
+ setup_options = options.view_as(SetupOptions)
+ # SDK container image pre-build explicitly enabled or disabled.
+ if (setup_options.prebuild_sdk_container_engine or
+ setup_options.disable_sdk_container_prebuild):
+ return
+ from apache_beam.runners.dataflow.internal import apiclient
+ # prebuild_sdk_container_engine only works with fnapi jobs.
+ if not apiclient._use_fnapi(options):
+ return
+ # if no additional dependencies, pre-building is not that useful.
+ if not any([setup_options.requirements_file,
+ setup_options.setup_file,
+ setup_options.sdk_location,
+ setup_options.extra_packages]):
+ return
+
+ google_cloud_options = options.view_as(GoogleCloudOptions)
+ from apache_beam.runners.dataflow.internal.clients import cloudbuild
+ from apache_beam.internal.gcp.auth import get_service_credentials
+ from apache_beam.internal.http_client import get_new_http
+ from apitools.base.py import exceptions
+
+ if google_cloud_options.no_auth:
+ credentials = None
+ else:
+ credentials = get_service_credentials()
+ cloudbuild_client = cloudbuild.CloudbuildV1(
+ credentials=credentials,
+ get_credentials=(not google_cloud_options.no_auth),
+ http=get_new_http(),
+ response_encoding='utf8')
+
+ # Check if the cloud build api is enabled on the project.
+ request = cloudbuild.CloudbuildProjectsBuildsListRequest(
+ projectId=google_cloud_options.project, pageSize=1)
+ try:
+ cloudbuild_client.projects_builds.List(request)
+ except exceptions.HttpForbiddenError as e:
+ if 'SERVICE_DISABLED' in str(e):
+ _LOGGER.info(
+ "Google Cloud Build API not enabled, consider enabling "
+ "it to utilize SDK container image pre-building workflow, "
+ "learn more Learn more about the workflow on "
+ "https://cloud.google.com/dataflow/docs/guides/"
+ "using-custom-containers#prebuild")
+ else:
+ _LOGGER.info(
Review Comment:
changed to warning.
##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -458,20 +525,46 @@ def run_pipeline(self, pipeline, options,
pipeline_proto=None):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
from apache_beam.transforms import environments
- if options.view_as(SetupOptions).prebuild_sdk_container_engine:
- # if prebuild_sdk_container_engine is specified we will build a new sdk
- # container image with dependencies pre-installed and use that image,
- # instead of using the inferred default container image.
- self._default_environment = (
- environments.DockerEnvironment.from_options(options))
- options.view_as(WorkerOptions).sdk_container_image = (
- self._default_environment.container_image)
- else:
- self._default_environment = (
- environments.DockerEnvironment.from_container_image(
- apiclient.get_container_image_from_options(options),
- artifacts=environments.python_sdk_dependencies(options),
-
resource_hints=environments.resource_hints_from_options(options)))
+ original_options_contain_prebuild = options.view_as(
+ SetupOptions).prebuild_sdk_container_engine is not None
+ self._enable_sdk_prebuild_if_applicable(options)
+ should_submit_job_without_prebuilding = True
+ try:
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ # if prebuild_sdk_container_engine is specified we will build a new sdk
+ # container image with dependencies pre-installed and use that image,
+ # instead of using the inferred default container image.
+ _LOGGER.info(
+ "SDK container image pre-build workflow started. Check dataflow "
+ "website https://cloud.google.com/dataflow/docs/guides/"
+ "using-custom-containers#prebuild for more info.")
+ self._default_environment = (
+ environments.DockerEnvironment.from_options(options))
+ options.view_as(WorkerOptions).sdk_container_image = (
+ self._default_environment.container_image)
+ should_submit_job_without_prebuilding = False
+ _LOGGER.warning(
+ "Please delete prebuilt SDK container image %s after the "
+ "job is done. Failing to do so may incur additional "
+ "storage costs." % self._default_environment.container_image)
+ except RuntimeError as e:
+ _LOGGER.error("SDK image build failed: %s" % e, exc_info=True)
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]