Thanks for the clarification, Cham! Does it mean that for now I'm pretty much stuck with a modified Beam distribution, unless/until that filesystem is a part of Beam, or until there's a better mechanism to discover new filesystem classes?
Alternatively, maybe it's still possible to change the order of imports somehow, so that we initialize the FileSystems class after all the side packages are imported? That would be an even better solution, although it may not be possible due to circular dependency issues or whatnot. On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <[email protected]> wrote: > You should be able to use a locally modified version of Beam using > following. > > python setup.py sdist > When running your program use option '--sdk_location dist/<src dist>' when > running your program. > > Beam currently look for FileSystem implementations using the sub-class > check below. > https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/io/filesystems.py#L58 > > For this to work you need to force import your FileSystem similar to > following. > https://github.com/apache/beam/blob/master/sdks/python/ > apache_beam/io/filesystems.py#L30 > > Thanks, > Cham > > On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <[email protected]> > wrote: > >> Hey folks, >> >> So, I've been trying to run this custom S3 filesystem of mine on the >> Dataflow runner. Works fine on the direct runner, even produces the correct >> results. :) >> >> However, when I run it on Dataflow (it's basically a wordcount example >> that has the gs:// paths replaced with s3:// paths), I see the following >> errors: >> >> >> (b84963e5767747e2): Traceback (most recent call last): File >> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", >> line 581, in do_work work_executor.execute() File "/usr/local/lib/python2.7/ >> dist-packages/dataflow_worker/executor.py", line 209, in execute >> self._split_task) File "/usr/local/lib/python2.7/ >> dist-packages/dataflow_worker/executor.py", line 217, in >> _perform_source_split_considering_api_limits desired_bundle_size) File >> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", >> line 254, in _perform_source_split for split in >> source.split(desired_bundle_size): >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", >> line 172, in split return self._get_concat_source().split( File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", >> line 109, in _f return fnc(self, *args, **kwargs) File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", >> line 119, in _get_concat_source match_result = >> FileSystems.match([pattern])[0] >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >> line 130, in match filesystem = FileSystems.get_filesystem(patterns[0]) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", >> line 61, in get_filesystem raise ValueError('Unable to get the Filesystem >> for path %s' % path) ValueError: Unable to get the Filesystem for path >> s3://pm-dataflow/test >> >> >> To my understanding, this means that the module that contains my >> S3FileSystem doesn't get imported (it currently resides in a separate >> package that I ship to the pipeline). And, I guess, the order of modules >> importing may differ depending on the runner, that's the only explanation >> of why exactly the same pipeline works on the direct runner and produces >> the correct results. >> >> Any ideas of the best way to troubleshoot this? Can I somehow make Beam >> upload some locally-modified version of the apache_beam package to >> Dataflow? I'd be happy to provide more details if needed. Or should I >> instead be reaching out to the Dataflow team? >> >> Thank you. >> >> >> -- >> Best regards, >> Dmitry Demeshchuk. >> > -- Best regards, Dmitry Demeshchuk.
