[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078949#comment-16078949 ]
Chamikara Jayalath commented on BEAM-2573: ------------------------------------------ Right. Usually you should not have to import FileSystem implementations during pipeline construction and importing just for invoking the side affect of being included in beam_plugin has downsides (lint errors in user program, non-obvious api, relying on runner specific code). It might be better to have a more direct API for registering FileSystems. > Better filesystem discovery mechanism in Python SDK > --------------------------------------------------- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py > Affects Versions: 2.0.0 > Reporter: Dmitry Demeshchuk > Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11116) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in <lambda> > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in <lambda> > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 99, in split > filesystem = FileSystems.get_filesystem(path) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 61, in get_filesystem > raise ValueError('Unable to get the Filesystem for path %s' % path) > ValueError: Unable to get the Filesystem for path > s3://my-test-bucket/test_output [while running > 'write/Write/WriteImpl/InitializeWrite'] > {code} > I apologize for not providing full context or codebase, because a lot of the > code we are running is internal, and some of it is tightly coupled to our > infrastructure. If I run out of experimenting options, I'll try to narrow my > use case down to the simplest case possible (like, override a gcs filesystem > with a different path prefix or something). > I think there are several possibilities here: > 1. I'm doing something wrong, and it should be trivial to achieve something > like that. This probably implies figuring out the right approach and writing > some guideline for the sources/sinks page in the docs. > 2. The current order of imports is not optimal, and we could possibly import > the side packages before initializing the filesystem classes. I currently > possess too little knowledge about the way things get executed in Dataflow, > so it's hard for me to tell how much it's worth diving that rabbithole. > 3. There just needs to be a better way of referring to additional filesystem > classes. One way of doing that is to just specify a class name explicitly > inside the ReadFromText and WriteToText functions (or something along these > lines). PipelineOptions seems like an overkill for this, but may still be an > option. Finally, maybe there could be just a function that gets called in the > main script that somehow tells Beam to discover a specific class? -- This message was sent by Atlassian JIRA (v6.4.14#64029)