Dmitry,

You can use the --beam_plugin flag [1] to import your plugin before any
other code execution. A failure in this step will result in the warning
Robert mentioned ("Failed to import beam plugin ..."), and you can look for
those in worker logs.

This flag is not available in the 2.0.0 version and will be available in
the next release. Since you are building the SDK from head, it should be
already available to you.

Ahmet

[1]
https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d68609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554

On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]> wrote:

> Do you see any warnings "Failed to import beam plugin ..." in your
> logs? If so, that may indicate some kind of packaging issue.
>
> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]>
> wrote:
> > Discovery of file-system implementations should only occur after your
> > package is installed. But your class won't be discoverable until it is
> > imported somewhere.
> >
> > I agree that we need a better mechanism for registering and discovering
> new
> > FileSystems. Would you mind filing a JIRA ticket for that ?
> >
> > - Cham
> >
> >
> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]>
> > wrote:
> >>
> >> Thanks for the clarification, Cham!
> >>
> >> Does it mean that for now I'm pretty much stuck with a modified Beam
> >> distribution, unless/until that filesystem is a part of Beam, or until
> >> there's a better mechanism to discover new filesystem classes?
> >>
> >> Alternatively, maybe it's still possible to change the order of imports
> >> somehow, so that we initialize the FileSystems class after all the side
> >> packages are imported? That would be an even better solution, although
> it
> >> may not be possible due to circular dependency issues or whatnot.
> >>
> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <
> [email protected]>
> >> wrote:
> >>>
> >>> You should be able to use a locally modified version of Beam using
> >>> following.
> >>>
> >>> python setup.py sdist
> >>> When running your program use option '--sdk_location dist/<src dist>'
> >>> when running your program.
> >>>
> >>> Beam currently look for FileSystem implementations using the sub-class
> >>> check below.
> >>>
> >>> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/io/filesystems.py#L58
> >>>
> >>> For this to work you need to force import your FileSystem similar to
> >>> following.
> >>>
> >>> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/io/filesystems.py#L30
> >>>
> >>> Thanks,
> >>> Cham
> >>>
> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]
> >
> >>> wrote:
> >>>>
> >>>> Hey folks,
> >>>>
> >>>> So, I've been trying to run this custom S3 filesystem of mine on the
> >>>> Dataflow runner. Works fine on the direct runner, even produces the
> correct
> >>>> results. :)
> >>>>
> >>>> However, when I run it on Dataflow (it's basically a wordcount example
> >>>> that has the gs:// paths replaced with s3:// paths), I see the
> following
> >>>> errors:
> >>>>
> >>>>
> >>>> (b84963e5767747e2): Traceback (most recent call last): File
> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/
> batchworker.py",
> >>>> line 581, in do_work work_executor.execute() File
> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line
> >>>> 209, in execute self._split_task) File
> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line
> >>>> 217, in _perform_source_split_considering_api_limits
> desired_bundle_size)
> >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/
> executor.py",
> >>>> line 254, in _perform_source_split for split in
> >>>> source.split(desired_bundle_size): File
> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/
> filebasedsource.py",
> >>>> line 172, in split return self._get_concat_source().split( File
> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/
> options/value_provider.py",
> >>>> line 109, in _f return fnc(self, *args, **kwargs) File
> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/
> filebasedsource.py",
> >>>> line 119, in _get_concat_source match_result =
> >>>> FileSystems.match([pattern])[0] File
> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line
> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0])
> File
> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line
> >>>> 61, in get_filesystem raise ValueError('Unable to get the Filesystem
> for
> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path
> >>>> s3://pm-dataflow/test
> >>>>
> >>>>
> >>>> To my understanding, this means that the module that contains my
> >>>> S3FileSystem doesn't get imported (it currently resides in a separate
> >>>> package that I ship to the pipeline). And, I guess, the order of
> modules
> >>>> importing may differ depending on the runner, that's the only
> explanation of
> >>>> why exactly the same pipeline works on the direct runner and produces
> the
> >>>> correct results.
> >>>>
> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make
> Beam
> >>>> upload some locally-modified version of the apache_beam package to
> Dataflow?
> >>>> I'd be happy to provide more details if needed. Or should I instead be
> >>>> reaching out to the Dataflow team?
> >>>>
> >>>> Thank you.
> >>>>
> >>>>
> >>>> --
> >>>> Best regards,
> >>>> Dmitry Demeshchuk.
> >>
> >>
> >>
> >>
> >> --
> >> Best regards,
> >> Dmitry Demeshchuk.
>

Reply via email to