That certainly makes sense.  Would the flow look like:
job submission stages to GCS/rewrites artifacts in the proto to point at
GCS -> job requests artifact from service (via the same gs:// url) ->
service downloads from GCS and stream artifact back?

One thing I find a little confusing with the artifact process in general is
that (in what you're describing) it seems like there's actually two
artifact services involved.  One is used when the job is being staged, and
one when the job is running, but (at least in our case) these are two
different services running in different places, with different access
restrictions.

For example, our internal expansion service returns artifacts that point at
our internal blobstore (URL resources).  The way things work now in the
staging process is that the internal artifact service is never even used,
the stager downloads the artifacts directly from the URL and uploads them
to GCS.  This is (luckily) how things work now by default, but are also
required because the job running in dataflow couldn't access the same
resources on the internal blobstore.  Is the assumption that the dataflow
runner would always do something like this (convert all resources to URL
resources pointing to GCS)?

Is there a design doc I can read about how this whole process should work
in the ideal world?

On Fri, Jan 14, 2022 at 6:58 PM Luke Cwik <lc...@google.com> wrote:

> Google has been trying to migrate away from Dataflow implementation for
> Python for staging which isn't easy to change onto the artifact service and
> provisioning service with Dataflow prime. With these the container can
> choose what to do with the resources and should have control over how
> certain artifacts are treated specially so it might work out of the box
> with Dataflow prime for you and if not you can control what the Python
> container is doing.
>
> On Mon, Dec 13, 2021 at 12:28 PM Steve Niemitz <sniem...@apache.org>
> wrote:
>
>> I spent a bunch of time on this over the last week and arrived at
>> something that works.  I had to try a few different ways to make this work
>> though.
>>
>> 1) Simply removing the unique job prefix and appending the sha256 to the
>> uploaded artifact fails because some paths are specifically hard coded
>> (dataflow_python_sdk.tar, pipeline.pb, etc) in the runtime stager (which
>> seems to be closed-source in dataflow from what I can tell).
>>
>> 2) If instead I prefix the artifacts with their sha256 as a path (eg
>> gs://my-bucket/<sha256>/dataflow_python_sdk.tar) the job fails again at
>> runtime, with the stager complaining that the directory structure isn't
>> "flat", it seems like it doesn't support artifacts from multiple paths.
>>
>> I arrived at a solution that does work, but isn't the most optimal.  I
>> first stage the artifacts to the same bucket, but in a different path (eg
>> gs://my-bucket/cache/<sha256>), these are only staged if they don't already
>> exist.  Then, I can use a GCS copy operation to copy them from the cache
>> directory to the "unique" staging directory.  The copy operation is
>> ~instant and doesn't require the user to download/reupload the artifact
>> again.
>>
>> I think ideally we'd fix #1 to work, it seems like the worker bootstrap
>> just needs to be updated to be more robust (although maybe there's more
>> issues there?).  In any case, what kind of appetite would there be for a PR
>> for this?  It could be additionally gated behind a pipeline option if we
>> don't want it to be the default.
>>
>> On Tue, Dec 7, 2021 at 12:49 PM Steve Niemitz <sniem...@apache.org>
>> wrote:
>>
>>> I noticed that the python dataflow runner appends some "uniqueness" (the
>>> timestamp) [1] to the staging directory when staging artifacts for a
>>> dataflow job.  This is very suboptimal because it makes caching artifacts
>>> between job runs useless.
>>>
>>> The jvm runner doesn't do this, is there a good reason the python one
>>> does?  Or is this just an oversight that hasn't been fixed yet?
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py#L467
>>>
>>

Reply via email to