@Cham: Good call, filed https://issues.apache.org/jira/browse/BEAM-2573
with some initial context.

@Robert: I'm not seeing any errors like that in the Stackdriver logs, but
I'll keep an eye on them, just in case. In fact, up until now I've been
able to do a lot of things with packaging just fine: install custom apt
packages on the Dataflow nodes, even get our internal pip packages shipped
there properly and being properly available at the Dataflow nodes.

@Ahmet: Thanks for the tip! I currently use the 2.0.0 tag, actually, and
all of my code is being run from a separate package, that gets installed
using a custom --setup_file argument. But I'll try to just use the head and
see if it helps. I don't fully understand the notion of plugins though. Is
that just an arbitrary Python package, or is it supposed to be an addition
to the apache_beam package, akin to "apache_beam[gcp]" ?

On Fri, Jul 7, 2017 at 6:18 PM, Ahmet Altay <[email protected]> wrote:

> Dmitry,
>
> You can use the --beam_plugin flag [1] to import your plugin before any
> other code execution. A failure in this step will result in the warning
> Robert mentioned ("Failed to import beam plugin ..."), and you can look for
> those in worker logs.
>
> This flag is not available in the 2.0.0 version and will be available in
> the next release. Since you are building the SDK from head, it should be
> already available to you.
>
> Ahmet
>
> [1] https://github.com/apache/beam/blob/56e4251deeb080ceff331fc4adb5d6
> 8609b04c71/sdks/python/apache_beam/options/pipeline_options.py#L554
>
> On Fri, Jul 7, 2017 at 5:52 PM, Robert Bradshaw <[email protected]>
> wrote:
>
>> Do you see any warnings "Failed to import beam plugin ..." in your
>> logs? If so, that may indicate some kind of packaging issue.
>>
>> On Fri, Jul 7, 2017 at 5:44 PM, Chamikara Jayalath <[email protected]>
>> wrote:
>> > Discovery of file-system implementations should only occur after your
>> > package is installed. But your class won't be discoverable until it is
>> > imported somewhere.
>> >
>> > I agree that we need a better mechanism for registering and discovering
>> new
>> > FileSystems. Would you mind filing a JIRA ticket for that ?
>> >
>> > - Cham
>> >
>> >
>> > On Fri, Jul 7, 2017 at 5:25 PM Dmitry Demeshchuk <[email protected]>
>> > wrote:
>> >>
>> >> Thanks for the clarification, Cham!
>> >>
>> >> Does it mean that for now I'm pretty much stuck with a modified Beam
>> >> distribution, unless/until that filesystem is a part of Beam, or until
>> >> there's a better mechanism to discover new filesystem classes?
>> >>
>> >> Alternatively, maybe it's still possible to change the order of imports
>> >> somehow, so that we initialize the FileSystems class after all the side
>> >> packages are imported? That would be an even better solution, although
>> it
>> >> may not be possible due to circular dependency issues or whatnot.
>> >>
>> >> On Fri, Jul 7, 2017 at 5:14 PM, Chamikara Jayalath <
>> [email protected]>
>> >> wrote:
>> >>>
>> >>> You should be able to use a locally modified version of Beam using
>> >>> following.
>> >>>
>> >>> python setup.py sdist
>> >>> When running your program use option '--sdk_location dist/<src dist>'
>> >>> when running your program.
>> >>>
>> >>> Beam currently look for FileSystem implementations using the sub-class
>> >>> check below.
>> >>>
>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach
>> e_beam/io/filesystems.py#L58
>> >>>
>> >>> For this to work you need to force import your FileSystem similar to
>> >>> following.
>> >>>
>> >>> https://github.com/apache/beam/blob/master/sdks/python/apach
>> e_beam/io/filesystems.py#L30
>> >>>
>> >>> Thanks,
>> >>> Cham
>> >>>
>> >>> On Fri, Jul 7, 2017 at 5:01 PM Dmitry Demeshchuk <
>> [email protected]>
>> >>> wrote:
>> >>>>
>> >>>> Hey folks,
>> >>>>
>> >>>> So, I've been trying to run this custom S3 filesystem of mine on the
>> >>>> Dataflow runner. Works fine on the direct runner, even produces the
>> correct
>> >>>> results. :)
>> >>>>
>> >>>> However, when I run it on Dataflow (it's basically a wordcount
>> example
>> >>>> that has the gs:// paths replaced with s3:// paths), I see the
>> following
>> >>>> errors:
>> >>>>
>> >>>>
>> >>>> (b84963e5767747e2): Traceback (most recent call last): File
>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batc
>> hworker.py",
>> >>>> line 581, in do_work work_executor.execute() File
>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>> line
>> >>>> 209, in execute self._split_task) File
>> >>>> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py",
>> line
>> >>>> 217, in _perform_source_split_considering_api_limits
>> desired_bundle_size)
>> >>>> File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/exec
>> utor.py",
>> >>>> line 254, in _perform_source_split for split in
>> >>>> source.split(desired_bundle_size): File
>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb
>> asedsource.py",
>> >>>> line 172, in split return self._get_concat_source().split( File
>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/
>> value_provider.py",
>> >>>> line 109, in _f return fnc(self, *args, **kwargs) File
>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileb
>> asedsource.py",
>> >>>> line 119, in _get_concat_source match_result =
>> >>>> FileSystems.match([pattern])[0] File
>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>> line
>> >>>> 130, in match filesystem = FileSystems.get_filesystem(patterns[0])
>> File
>> >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>> line
>> >>>> 61, in get_filesystem raise ValueError('Unable to get the Filesystem
>> for
>> >>>> path %s' % path) ValueError: Unable to get the Filesystem for path
>> >>>> s3://pm-dataflow/test
>> >>>>
>> >>>>
>> >>>> To my understanding, this means that the module that contains my
>> >>>> S3FileSystem doesn't get imported (it currently resides in a separate
>> >>>> package that I ship to the pipeline). And, I guess, the order of
>> modules
>> >>>> importing may differ depending on the runner, that's the only
>> explanation of
>> >>>> why exactly the same pipeline works on the direct runner and
>> produces the
>> >>>> correct results.
>> >>>>
>> >>>> Any ideas of the best way to troubleshoot this? Can I somehow make
>> Beam
>> >>>> upload some locally-modified version of the apache_beam package to
>> Dataflow?
>> >>>> I'd be happy to provide more details if needed. Or should I instead
>> be
>> >>>> reaching out to the Dataflow team?
>> >>>>
>> >>>> Thank you.
>> >>>>
>> >>>>
>> >>>> --
>> >>>> Best regards,
>> >>>> Dmitry Demeshchuk.
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Best regards,
>> >> Dmitry Demeshchuk.
>>
>
>


-- 
Best regards,
Dmitry Demeshchuk.

Reply via email to