Just to get closure on this and give a reference to people who stumble upon this problem and read the archives: the --beam_packages flag did work indeed, and now I'll just wait for 2.1.0 to come out.
Thanks for everyone's help! On Fri, Jul 7, 2017 at 7:25 PM, Robert Bradshaw <[email protected]> wrote: > On Fri, Jul 7, 2017 at 6:33 PM, Ahmet Altay <[email protected]> wrote: > > > > > > On Fri, Jul 7, 2017 at 6:26 PM, Dmitry Demeshchuk <[email protected]> > > wrote: > >> > >> @Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573 > >> with some initial context. > >> > >> @Robert: I'm not seeing any errors like that in the Stackdriver logs, > but > >> I'll keep an eye on them, just in case. In fact, up until now I've been > able > >> to do a lot of things with packaging just fine: install custom apt > packages > >> on the Dataflow nodes, even get our internal pip packages shipped there > >> properly and being properly available at the Dataflow nodes. > >> > >> @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and > >> all of my code is being run from a separate package, that gets installed > >> using a custom --setup_file argument. But I'll try to just use the head > and > >> see if it helps. I don't fully understand the notion of plugins though. > Is > >> that just an arbitrary Python package, or is it supposed to be an > addition > >> to the apache_beam package, akin to "apache_beam[gcp]" ? > > > > > > Arbitrary package. It is a mechanism to import modules before executing > > things. This flag is likely the existing solution for BEAM-2573. > > Note that at beam head, any subclass of FileSystem that was imported > in your main module should be automatically added to this flag, i.e. > things should "just work" like they did in the direct runner. > Hopefully they do--let us know either way. > > >> On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote: > >>> > >>> Dmitry, > >>> > >>> You can use the --beam_plugin flag [1] to import your plugin before any > >>> other code execution. A failure in this step will result in the warning > >>> Robert mentioned ("Failed to import beam plugin ..."), and you can > look for > >>> those in worker logs. > >>> > >>> This flag is not available in the 2.0.0 version and will be available > in > >>> the next release. Since you are building the SDK from head, it should > be > >>> already available to you. > >>> > >>> Ahmet > >>> > >>> [1] > >>> https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d6 > 8609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554 > >>> > >>> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]> > >>> wrote: > >>>> > >>>> Do you see any warnings "Failed to import beam plugin ..." in your > >>>> logs? If so, that may indicate some kind of packaging issue. > >>>> > >>>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath > >>>> <[email protected]> wrote: > >>>> > Discovery of file-system implementations should only occur after > your > >>>> > package is installed. But your class won't be discoverable until it > is > >>>> > imported somewhere. > >>>> > > >>>> > I agree that we need a better mechanism for registering and > >>>> > discovering new > >>>> > FileSystems. Would you mind filing a JIRA ticket for that ? > >>>> > > >>>> > - Cham > >>>> > > >>>> > > >>>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk > >>>> > <[email protected]> > >>>> > wrote: > >>>> >> > >>>> >> Thanks for the clarification, Cham! > >>>> >> > >>>> >> Does it mean that for now I'm pretty much stuck with a modified > Beam > >>>> >> distribution, unless/until that filesystem is a part of Beam, or > >>>> >> until > >>>> >> there's a better mechanism to discover new filesystem classes? > >>>> >> > >>>> >> Alternatively, maybe it's still possible to change the order of > >>>> >> imports > >>>> >> somehow, so that we initialize the FileSystems class after all the > >>>> >> side > >>>> >> packages are imported? That would be an even better solution, > >>>> >> although it > >>>> >> may not be possible due to circular dependency issues or whatnot. > >>>> >> > >>>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath > >>>> >> <[email protected]> > >>>> >> wrote: > >>>> >>> > >>>> >>> You should be able to use a locally modified version of Beam using > >>>> >>> following. > >>>> >>> > >>>> >>> python setup.py sdist > >>>> >>> When running your program use option '--sdk_location dist/<src > >>>> >>> dist>' > >>>> >>> when running your program. > >>>> >>> > >>>> >>> Beam currently look for FileSystem implementations using the > >>>> >>> sub-class > >>>> >>> check below. > >>>> >>> > >>>> >>> > >>>> >>> https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/io/filesystems.py#L58 > >>>> >>> > >>>> >>> For this to work you need to force import your FileSystem similar > to > >>>> >>> following. > >>>> >>> > >>>> >>> > >>>> >>> https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/io/filesystems.py#L30 > >>>> >>> > >>>> >>> Thanks, > >>>> >>> Cham > >>>> >>> > >>>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk > >>>> >>> <[email protected]> > >>>> >>> wrote: > >>>> >>>> > >>>> >>>> Hey folks, > >>>> >>>> > >>>> >>>> So, I've been trying to run this custom S3 filesystem of mine on > >>>> >>>> the > >>>> >>>> Dataflow runner. Works fine on the direct runner, even produces > the > >>>> >>>> correct > >>>> >>>> results. :) > >>>> >>>> > >>>> >>>> However, when I run it on Dataflow (it's basically a wordcount > >>>> >>>> example > >>>> >>>> that has the gs:// paths replaced with s3:// paths), I see the > >>>> >>>> following > >>>> >>>> errors: > >>>> >>>> > >>>> >>>> > >>>> >>>> (b84963e5767747e2): Traceback (most recent call last): File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/ > batchworker.py", > >>>> >>>> line 581, in do_work work_executor.execute() File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line > >>>> >>>> 209, in execute self._split_task) File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line > >>>> >>>> 217, in _perform_source_split_considering_api_limits > >>>> >>>> desired_bundle_size) > >>>> >>>> File > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/ > executor.py", > >>>> >>>> line 254, in _perform_source_split for split in > >>>> >>>> source.split(desired_bundle_size): File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/ > filebasedsource.py", > >>>> >>>> line 172, in split return self._get_concat_source().split( File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/ > options/value_provider.py", > >>>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/ > filebasedsource.py", > >>>> >>>> line 119, in _get_concat_source match_result = > >>>> >>>> FileSystems.match([pattern])[0] File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line > >>>> >>>> 130, in match filesystem = FileSystems.get_filesystem( > patterns[0]) > >>>> >>>> File > >>>> >>>> > >>>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line > >>>> >>>> 61, in get_filesystem raise ValueError('Unable to get the > >>>> >>>> Filesystem for > >>>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for > path > >>>> >>>> s3://pm-dataflow/test > >>>> >>>> > >>>> >>>> > >>>> >>>> To my understanding, this means that the module that contains my > >>>> >>>> S3FileSystem doesn't get imported (it currently resides in a > >>>> >>>> separate > >>>> >>>> package that I ship to the pipeline). And, I guess, the order of > >>>> >>>> modules > >>>> >>>> importing may differ depending on the runner, that's the only > >>>> >>>> explanation of > >>>> >>>> why exactly the same pipeline works on the direct runner and > >>>> >>>> produces the > >>>> >>>> correct results. > >>>> >>>> > >>>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow > make > >>>> >>>> Beam > >>>> >>>> upload some locally-modified version of the apache_beam package > to > >>>> >>>> Dataflow? > >>>> >>>> I'd be happy to provide more details if needed. Or should I > instead > >>>> >>>> be > >>>> >>>> reaching out to the Dataflow team? > >>>> >>>> > >>>> >>>> Thank you. > >>>> >>>> > >>>> >>>> > >>>> >>>> -- > >>>> >>>> Best regards, > >>>> >>>> Dmitry Demeshchuk. > >>>> >> > >>>> >> > >>>> >> > >>>> >> > >>>> >> -- > >>>> >> Best regards, > >>>> >> Dmitry Demeshchuk. > >>> > >>> > >> > >> > >> > >> -- > >> Best regards, > >> Dmitry Demeshchuk. > > > > > -- Best regards, Dmitry Demeshchuk.
