[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079848#comment-16079848
 ] 

Sourabh Bajaj commented on BEAM-2573:
-------------------------------------

[~chamikara]
The issue with the register approach is that it needs to run in the python 
process of the worker so something from pipeline construction needs to tell the 
worker process to import these files and we use beam plugins as a transport 
mechanism for that. 
If you're doing register in the pipeline construction you'll have to import the 
class anyway so that kind of makes the registration pointless as the import 
itself would make the FS available to the subclasses module.

[~demeshchuk] Can you try importing dataflow in your pipeline submission 
process and see if that makes the plugin available. You can just look at the 
pipeline options to see if it worked correctly as it should try to pass the 
full S3FS path in the --beam-plugins option.

Ideally I think the best way to do this would have been changing the Path to be 
FS specific and the ReadFromText would take in a specialized S3Path object but 
that is a really large breaking change so we have to live with this.

> Better filesystem discovery mechanism in Python SDK
> ---------------------------------------------------
>
>                 Key: BEAM-2573
>                 URL: https://issues.apache.org/jira/browse/BEAM-2573
>             Project: Beam
>          Issue Type: Task
>          Components: runner-dataflow, sdk-py
>    Affects Versions: 2.0.0
>            Reporter: Dmitry Demeshchuk
>            Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
>     op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
>     self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
>     def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
>     cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
>     with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11116)
>     self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
>     self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
>     self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
>     raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
>     self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
>     self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
>     windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in <lambda>
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in <lambda>
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
>     return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
>     tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
>     base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 99, in split
>     filesystem = FileSystems.get_filesystem(path)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 61, in get_filesystem
>     raise ValueError('Unable to get the Filesystem for path %s' % path)
> ValueError: Unable to get the Filesystem for path 
> s3://my-test-bucket/test_output [while running 
> 'write/Write/WriteImpl/InitializeWrite']
> {code}
> I apologize for not providing full context or codebase, because a lot of the 
> code we are running is internal, and some of it is tightly coupled to our 
> infrastructure. If I run out of experimenting options, I'll try to narrow my 
> use case down to the simplest case possible (like, override a gcs filesystem 
> with a different path prefix or something).
> I think there are several possibilities here:
> 1. I'm doing something wrong, and it should be trivial to achieve something 
> like that. This probably implies figuring out the right approach and writing 
> some guideline for the sources/sinks page in the docs.
> 2. The current order of imports is not optimal, and we could possibly import 
> the side packages before initializing the filesystem classes. I currently 
> possess too little knowledge about the way things get executed in Dataflow, 
> so it's hard for me to tell how much it's worth diving that rabbithole.
> 3. There just needs to be a better way of referring to additional filesystem 
> classes. One way of doing that is to just specify a class name explicitly 
> inside the ReadFromText and WriteToText functions (or something along these 
> lines). PipelineOptions seems like an overkill for this, but may still be an 
> option. Finally, maybe there could be just a function that gets called in the 
> main script that somehow tells Beam to discover a specific class?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to