@Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573 with some initial context.
@Robert: I'm not seeing any errors like that in the Stackdriver logs, but I'll keep an eye on them, just in case. In fact, up until now I've been able to do a lot of things with packaging just fine: install custom apt packages on the Dataflow nodes, even get our internal pip packages shipped there properly and being properly available at the Dataflow nodes. @Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and all of my code is being run from a separate package, that gets installed using a custom --setup_file argument. But I'll try to just use the head and see if it helps. I don't fully understand the notion of plugins though. Is that just an arbitrary Python package, or is it supposed to be an addition to the apache_beam package, akin to "apache_beam[gcp]" ? On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote: > Dmitry, > > You can use the --beam_plugin flag [1] to import your plugin before any > other code execution. A failure in this step will result in the warning > Robert mentioned ("Failed to import beam plugin ..."), and you can look for > those in worker logs. > > This flag is not available in the 2.0.0 version and will be available in > the next release. Since you are building the SDK from head, it should be > already available to you. > > Ahmet > > [1] https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d6 > 8609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554 > > On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]> > wrote: > >> Do you see any warnings "Failed to import beam plugin ..." in your >> logs? If so, that may indicate some kind of packaging issue. >> >> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]> >> wrote: >> > Discovery of file-system implementations should only occur after your >> > package is installed. But your class won't be discoverable until it is >> > imported somewhere. >> > >> > I agree that we need a better mechanism for registering and discovering >> new >> > FileSystems. Would you mind filing a JIRA ticket for that ? >> > >> > - Cham >> > >> > >> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]> >> > wrote: >> >> >> >> Thanks for the clarification, Cham! >> >> >> >> Does it mean that for now I'm pretty much stuck with a modified Beam >> >> distribution, unless/until that filesystem is a part of Beam, or until >> >> there's a better mechanism to discover new filesystem classes? >> >> >> >> Alternatively, maybe it's still possible to change the order of imports >> >> somehow, so that we initialize the FileSystems class after all the side >> >> packages are imported? That would be an even better solution, although >> it >> >> may not be possible due to circular dependency issues or whatnot. >> >> >> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath < >> [email protected]> >> >> wrote: >> >>> >> >>> You should be able to use a locally modified version of Beam using >> >>> following. >> >>> >> >>> python setup.py sdist >> >>> When running your program use option '--sdk_location dist/<src dist>' >> >>> when running your program. >> >>> >> >>> Beam currently look for FileSystem implementations using the sub-class >> >>> check below. >> >>> >> >>> https://github.com/apache/beam/blob/master/sdks/python/apach >> e_beam/io/filesystems.py#L58 >> >>> >> >>> For this to work you need to force import your FileSystem similar to >> >>> following. >> >>> >> >>> https://github.com/apache/beam/blob/master/sdks/python/apach >> e_beam/io/filesystems.py#L30 >> >>> >> >>> Thanks, >> >>> Cham >> >>> >> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk < >> [email protected]> >> >>> wrote: >> >>>> >> >>>> Hey folks, >> >>>> >> >>>> So, I've been trying to run this custom S3 filesystem of mine on the >> >>>> Dataflow runner. Works fine on the direct runner, even produces the >> correct >> >>>> results. :) >> >>>> >> >>>> However, when I run it on Dataflow (it's basically a wordcount >> example >> >>>> that has the gs:// paths replaced with s3:// paths), I see the >> following >> >>>> errors: >> >>>> >> >>>> >> >>>> (b84963e5767747e2): Traceback (most recent call last): File >> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batc >> hworker.py", >> >>>> line 581, in do_work work_executor.execute() File >> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >> line >> >>>> 209, in execute self._split_task) File >> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >> line >> >>>> 217, in _perform_source_split_considering_api_limits >> desired_bundle_size) >> >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/exec >> utor.py", >> >>>> line 254, in _perform_source_split for split in >> >>>> source.split(desired_bundle_size): File >> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb >> asedsource.py", >> >>>> line 172, in split return self._get_concat_source().split( File >> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/ >> value_provider.py", >> >>>> line 109, in _f return fnc(self, *args, **kwargs) File >> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb >> asedsource.py", >> >>>> line 119, in _get_concat_source match_result = >> >>>> FileSystems.match([pattern])[0] File >> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >> line >> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) >> File >> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >> line >> >>>> 61, in get_filesystem raise ValueError('Unable to get the Filesystem >> for >> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path >> >>>> s3://pm-dataflow/test >> >>>> >> >>>> >> >>>> To my understanding, this means that the module that contains my >> >>>> S3FileSystem doesn't get imported (it currently resides in a separate >> >>>> package that I ship to the pipeline). And, I guess, the order of >> modules >> >>>> importing may differ depending on the runner, that's the only >> explanation of >> >>>> why exactly the same pipeline works on the direct runner and >> produces the >> >>>> correct results. >> >>>> >> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make >> Beam >> >>>> upload some locally-modified version of the apache_beam package to >> Dataflow? >> >>>> I'd be happy to provide more details if needed. Or should I instead >> be >> >>>> reaching out to the Dataflow team? >> >>>> >> >>>> Thank you. >> >>>> >> >>>> >> >>>> -- >> >>>> Best regards, >> >>>> Dmitry Demeshchuk. >> >> >> >> >> >> >> >> >> >> -- >> >> Best regards, >> >> Dmitry Demeshchuk. >> > > -- Best regards, Dmitry Demeshchuk.
