Just to get closure on this and give a reference to people who stumble upon
this problem and read the archives: the --beam_packages flag did work
indeed, and now I'll just wait for 2.1.0 to come out.

Thanks for everyone's help!

On Fri, Jul 7, 2017 at 7:25 PM, Robert Bradshaw <[email protected]> wrote:

> On Fri, Jul 7, 2017 at 6:33 PM, Ahmet Altay <[email protected]> wrote:
> >
> >
> > On Fri, Jul 7, 2017 at 6:26 PM, Dmitry Demeshchuk <[email protected]>
> > wrote:
> >>
> >> @Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573
> >> with some initial context.
> >>
> >> @Robert: I'm not seeing any errors like that in the Stackdriver logs,
> but
> >> I'll keep an eye on them, just in case. In fact, up until now I've been
> able
> >> to do a lot of things with packaging just fine: install custom apt
> packages
> >> on the Dataflow nodes, even get our internal pip packages shipped there
> >> properly and being properly available at the Dataflow nodes.
> >>
> >> @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and
> >> all of my code is being run from a separate package, that gets installed
> >> using a custom --setup_file argument. But I'll try to just use the head
> and
> >> see if it helps. I don't fully understand the notion of plugins though.
> Is
> >> that just an arbitrary Python package, or is it supposed to be an
> addition
> >> to the apache_beam package, akin to "apache_beam[gcp]" ?
> >
> >
> > Arbitrary package. It is a mechanism to import modules before executing
> > things. This flag is likely the existing solution for BEAM-2573.
>
> Note that at beam head, any subclass of FileSystem that was imported
> in your main module should be automatically added to this flag, i.e.
> things should "just work" like they did in the direct runner.
> Hopefully they do--let us know either way.
>
> >> On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote:
> >>>
> >>> Dmitry,
> >>>
> >>> You can use the --beam_plugin flag [1] to import your plugin before any
> >>> other code execution. A failure in this step will result in the warning
> >>> Robert mentioned ("Failed to import beam plugin ..."), and you can
> look for
> >>> those in worker logs.
> >>>
> >>> This flag is not available in the 2.0.0 version and will be available
> in
> >>> the next release. Since you are building the SDK from head, it should
> be
> >>> already available to you.
> >>>
> >>> Ahmet
> >>>
> >>> [1]
> >>> https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d6
> 8609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554
> >>>
> >>> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]>
> >>> wrote:
> >>>>
> >>>> Do you see any warnings "Failed to import beam plugin ..." in your
> >>>> logs? If so, that may indicate some kind of packaging issue.
> >>>>
> >>>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath
> >>>> <[email protected]> wrote:
> >>>> > Discovery of file-system implementations should only occur after
> your
> >>>> > package is installed. But your class won't be discoverable until it
> is
> >>>> > imported somewhere.
> >>>> >
> >>>> > I agree that we need a better mechanism for registering and
> >>>> > discovering new
> >>>> > FileSystems. Would you mind filing a JIRA ticket for that ?
> >>>> >
> >>>> > - Cham
> >>>> >
> >>>> >
> >>>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk
> >>>> > <[email protected]>
> >>>> > wrote:
> >>>> >>
> >>>> >> Thanks for the clarification, Cham!
> >>>> >>
> >>>> >> Does it mean that for now I'm pretty much stuck with a modified
> Beam
> >>>> >> distribution, unless/until that filesystem is a part of Beam, or
> >>>> >> until
> >>>> >> there's a better mechanism to discover new filesystem classes?
> >>>> >>
> >>>> >> Alternatively, maybe it's still possible to change the order of
> >>>> >> imports
> >>>> >> somehow, so that we initialize the FileSystems class after all the
> >>>> >> side
> >>>> >> packages are imported? That would be an even better solution,
> >>>> >> although it
> >>>> >> may not be possible due to circular dependency issues or whatnot.
> >>>> >>
> >>>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath
> >>>> >> <[email protected]>
> >>>> >> wrote:
> >>>> >>>
> >>>> >>> You should be able to use a locally modified version of Beam using
> >>>> >>> following.
> >>>> >>>
> >>>> >>> python setup.py sdist
> >>>> >>> When running your program use option '--sdk_location dist/<src
> >>>> >>> dist>'
> >>>> >>> when running your program.
> >>>> >>>
> >>>> >>> Beam currently look for FileSystem implementations using the
> >>>> >>> sub-class
> >>>> >>> check below.
> >>>> >>>
> >>>> >>>
> >>>> >>> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/io/filesystems.py#L58
> >>>> >>>
> >>>> >>> For this to work you need to force import your FileSystem similar
> to
> >>>> >>> following.
> >>>> >>>
> >>>> >>>
> >>>> >>> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/io/filesystems.py#L30
> >>>> >>>
> >>>> >>> Thanks,
> >>>> >>> Cham
> >>>> >>>
> >>>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk
> >>>> >>> <[email protected]>
> >>>> >>> wrote:
> >>>> >>>>
> >>>> >>>> Hey folks,
> >>>> >>>>
> >>>> >>>> So, I've been trying to run this custom S3 filesystem of mine on
> >>>> >>>> the
> >>>> >>>> Dataflow runner. Works fine on the direct runner, even produces
> the
> >>>> >>>> correct
> >>>> >>>> results. :)
> >>>> >>>>
> >>>> >>>> However, when I run it on Dataflow (it's basically a wordcount
> >>>> >>>> example
> >>>> >>>> that has the gs:// paths replaced with s3:// paths), I see the
> >>>> >>>> following
> >>>> >>>> errors:
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> (b84963e5767747e2): Traceback (most recent call last): File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/
> batchworker.py",
> >>>> >>>> line 581, in do_work work_executor.execute() File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line
> >>>> >>>> 209, in execute self._split_task) File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
> line
> >>>> >>>> 217, in _perform_source_split_considering_api_limits
> >>>> >>>> desired_bundle_size)
> >>>> >>>> File
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/
> executor.py",
> >>>> >>>> line 254, in _perform_source_split for split in
> >>>> >>>> source.split(desired_bundle_size): File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/
> filebasedsource.py",
> >>>> >>>> line 172, in split return self._get_concat_source().split( File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/
> options/value_provider.py",
> >>>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/
> filebasedsource.py",
> >>>> >>>> line 119, in _get_concat_source match_result =
> >>>> >>>> FileSystems.match([pattern])[0] File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line
> >>>> >>>> 130, in match filesystem = FileSystems.get_filesystem(
> patterns[0])
> >>>> >>>> File
> >>>> >>>>
> >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
> line
> >>>> >>>> 61, in get_filesystem raise ValueError('Unable to get the
> >>>> >>>> Filesystem for
> >>>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for
> path
> >>>> >>>> s3://pm-dataflow/test
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> To my understanding, this means that the module that contains my
> >>>> >>>> S3FileSystem doesn't get imported (it currently resides in a
> >>>> >>>> separate
> >>>> >>>> package that I ship to the pipeline). And, I guess, the order of
> >>>> >>>> modules
> >>>> >>>> importing may differ depending on the runner, that's the only
> >>>> >>>> explanation of
> >>>> >>>> why exactly the same pipeline works on the direct runner and
> >>>> >>>> produces the
> >>>> >>>> correct results.
> >>>> >>>>
> >>>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow
> make
> >>>> >>>> Beam
> >>>> >>>> upload some locally-modified version of the apache_beam package
> to
> >>>> >>>> Dataflow?
> >>>> >>>> I'd be happy to provide more details if needed. Or should I
> instead
> >>>> >>>> be
> >>>> >>>> reaching out to the Dataflow team?
> >>>> >>>>
> >>>> >>>> Thank you.
> >>>> >>>>
> >>>> >>>>
> >>>> >>>> --
> >>>> >>>> Best regards,
> >>>> >>>> Dmitry Demeshchuk.
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Best regards,
> >>>> >> Dmitry Demeshchuk.
> >>>
> >>>
> >>
> >>
> >>
> >> --
> >> Best regards,
> >> Dmitry Demeshchuk.
> >
> >
>



-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to