[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082872#comment-16082872 ] Ahmet Altay commented on BEAM-2573: --- Thank you [~demeshchuk]. I agree with documenting this feature. Although it should be documented post release in order to not confuse 2.0.0 users. cc: [~melap] > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 99, in split > filesys
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082850#comment-16082850 ] Dmitry Demeshchuk commented on BEAM-2573: - Finally, after some fighting dependency issues, made it work, confirmed that it works on master as expected! Thanks a lot for all the help, folks! I guess, since {{beam_plugins}} are the only sane solution to this problem overall, we can just make sure to reflect that in https://beam.apache.org/documentation/io/authoring-overview/ and call it a day? > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081179#comment-16081179 ] Dmitry Demeshchuk commented on BEAM-2573: - [~altay]: I was doing exactly the option 2, yes. My guess is that there are some cached dependency issues, I've stumbled upon cached dependencies before. Will try to clean the virtualenv, install everything from scratch and see how it goes. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081160#comment-16081160 ] Ahmet Altay commented on BEAM-2573: --- [~demeshchuk], I belive you are mixing two SDK versions. The SDK version installed in your virtual environment should exactly match the sdk version you use with --sdk_location flag. You have two options: 1. Use the released version. pip install the latest version and do not set the --sdk_location flag. You will not have the new features that will be available in the next version. 2. Use the head version. Build and sdk from head. Install that in your new virtual environment and also use --sdk_location flag to point to that sdk. You will have all the new features available at head. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-package
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081145#comment-16081145 ] Sourabh Bajaj commented on BEAM-2573: - I think the worker is specified in https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81 so it is related to the version of the sdk being used. I might be wrong about needing the release if you're on master as I can run the following command and see logs about failing to import the dataflow package on head and success for the builtin plugins. {code:shell} python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project $PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp --job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location dist/apache-beam-2.1.0.dev0.tar.gz --beam_plugin dataflow.s3.aws.S3FS {code} > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_b
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080997#comment-16080997 ] Dmitry Demeshchuk commented on BEAM-2573: - [~sb2nov] Thanks for the explanation! I'll wait till the 2.1.0 release, then (or maybe release candidates would be good enough too). By the way, am I right to assume that full Dataflow compatibility is only guaranteed for tagged Beam versions, then? > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/u
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080974#comment-16080974 ] Sourabh Bajaj commented on BEAM-2573: - I think the plugins are working correctly if they are passing a list of class names to be imported at the start. You might need to wait for the next release as this required a change to the dataflow workers as they need to start importing the paths specified in the beam-plugins list. There is a release going on right now so that might happen in the next few days itself. I am not sure about the crash loop in windmillio [~charleschen] might know more. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) >
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080933#comment-16080933 ] Dmitry Demeshchuk commented on BEAM-2573: - Hmm, looks like it was indeed a dependency issue which setuptools was hiding. I did `pip install .` from the beam repo, and the plugins made it in there. Interestingly, Beam seems to have automatically converted the plugins list into a list of classes. In the Dataflow job's options, {{beam_plugins}} equals to {{['apache_beam.io.filesystem.FileSystem', 'apache_beam.io.localfilesystem.LocalFileSystem', 'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem', 'dataflow.aws.s3.S3FileSystem']}}. Interestingly, the crash loop described in my previous comment still happens. I'll continue looking at the sources to see where I screwed up. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080889#comment-16080889 ] Dmitry Demeshchuk commented on BEAM-2573: - Hmm, now it's getting a bit past that, but my pipeline enters an infinite crash loop backoff. This is what Stackdriver prints out: {code} IFile "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 26, in I I from dataflow_worker import windmillio IFile "/usr/local/lib/python2.7/dist-packages/dataflow_worker/windmillio.py", line 41, in I class PubSubWindmillSource(pubsub.PubSubSource): I : I 'module' object has no attribute 'PubSubSource' I checking backoff for container "python" in pod "dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" E Error syncing pod e6169e8537b1bd83321865dafc047ba4, skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: "Back-off 5m0s restarting failed container=python pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4)" I Setting node annotation to enable volume controller attach/detach I checking backoff for container "python" in pod "dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" I Back-off 5m0s restarting failed container=python pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4) {code} I'll try to clean up the local dependencies and see if that helps. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runn
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080867#comment-16080867 ] Dmitry Demeshchuk commented on BEAM-2573: - So, I tried to switch to the head version of the SDK and use the {{beam_plugins}} option. Here are the steps I performed: 1. Did {{python setup.py sdist}} in the master branch of Beam. Also, did {{python setup.py install}}. 2. Created the following dictionary in Python (note the {{beam_plugins}} option present): {code} {'job_name': 's3-wordcount-example-2', 'staging_location': 'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/staging_location', 'runner': 'dataflow', 'streaming': False, 'runtime_type_check': False, 'temp_location': 'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/temporary_location', 'setup_file': '/tmp/tmpEdRIo2/setup.py', 'dataflow_endpoint': 'https://dataflow.googleapis.com', 'sdk_location': '/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz', 'save_main_session': True, 'zone': 'us-west1-a', 'region': 'us-west1', 'profile_cpu': False, 'bucket': 'gs://dataflow-test-gc-project-168517', 'profile_memory': False, 'pipeline_type_check': True, 'project': 'test-gc-project-168517', 'direct_runner_use_stacked_bundle': True, 'type_check_strictness': 'DEFAULT_TO_ANY', 'beam_plugins': ' dataflow', 'no_auth': False} {code} 3. Created a {{PipelineOptions}} object and used it inside a pipeline: {{options = PipelineOptions.from_dictionary(options_dict)}} 4. Ran the pipeline in Dataflow. Now, in the Dataflow UI I'm seeing some of the pipeline options (for example, {{sdk_location}} is correct), however I'm not seeing {{beam_plugins}} anywhere. FWIW, the job's "Dataflow SDK version" equals to "Google Cloud Dataflow SDK for Python 2.0.0", but {{sdk_location}} equals to {{/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz}} (note the 2.2.0 version). Needless to say, the `beam_plugins` option doesn't seem to get applied to my pipeline, at least it fails as if the plugin wasn't imported. I'm almost sure this has something to do with the Dataflow SDK version, but so far cannot find a way to make it right. Any suggestions? > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.proc
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079848#comment-16079848 ] Sourabh Bajaj commented on BEAM-2573: - [~chamikara] The issue with the register approach is that it needs to run in the python process of the worker so something from pipeline construction needs to tell the worker process to import these files and we use beam plugins as a transport mechanism for that. If you're doing register in the pipeline construction you'll have to import the class anyway so that kind of makes the registration pointless as the import itself would make the FS available to the subclasses module. [~demeshchuk] Can you try importing dataflow in your pipeline submission process and see if that makes the plugin available. You can just look at the pipeline options to see if it worked correctly as it should try to pass the full S3FS path in the --beam-plugins option. Ideally I think the best way to do this would have been changing the Path to be FS specific and the ReadFromText would take in a specialized S3Path object but that is a really large breaking change so we have to live with this. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/minicon
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078995#comment-16078995 ] Ahmet Altay commented on BEAM-2573: --- cc: [~robertwb] > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 99, in split > filesystem = FileSystems.get_filesystem(path) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 61, in get_file
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078949#comment-16078949 ] Chamikara Jayalath commented on BEAM-2573: -- Right. Usually you should not have to import FileSystem implementations during pipeline construction and importing just for invoking the side affect of being included in beam_plugin has downsides (lint errors in user program, non-obvious api, relying on runner specific code). It might be better to have a more direct API for registering FileSystems. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, i
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078929#comment-16078929 ] Ahmet Altay commented on BEAM-2573: --- There is already a mechanism to register all plugins that was imported (https://github.com/apache/beam/blob/dba5140792969182b85e449eb2f5630bd45710ca/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L242). Would {{FileSystems.register()}} be for not imported plugins? > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078916#comment-16078916 ] Chamikara Jayalath commented on BEAM-2573: -- Will it make sense to add a FileSystems.register() method that adds an entry to --beam_plugin ? > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line > 99, in split > filesystem = FileSystems.get_filesystem(path) > File >
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078890#comment-16078890 ] Ahmet Altay commented on BEAM-2573: --- {{--beam_plugin}} flag allows importing arbitrary modules before, and it could be used using other file systems. Before choosing this, [~sb2nov] investigated other method for auto-discovery and all of them had short-comings. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Affects Versions: 2.0.0 >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 146, in initialize_write > tmp_dir = self._create_temp_dir(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", > line 151, in _create_temp_dir > base_path, last_component = FileSystems.split(file_path_prefix) > File > "/usr/local/lib/python2.7/dist-packages/a
[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078889#comment-16078889 ] Dmitry Demeshchuk commented on BEAM-2573: - Here's a quick example of the file being picked up locally: {code} In [1]: import dataflow No handlers could be found for logger "oauth2client.contrib.multistore_file" In [2]: from apache_beam.io.filesystem import FileSystem In [3]: FileSystem.get_all_subclasses() Out[3]: [apache_beam.io.localfilesystem.LocalFileSystem, apache_beam.io.gcp.gcsfilesystem.GCSFileSystem, dataflow.aws.s3.S3FileSystem] {code} dataflow is our internal Python package that we use for more robust interaction with Dataflow and custom sources/sinks/transforms. It definitely gets loaded at the end, because other pipelines that use it work just fine, and it's only the filesystems that can't pick up its classes. > Better filesystem discovery mechanism in Python SDK > --- > > Key: BEAM-2573 > URL: https://issues.apache.org/jira/browse/BEAM-2573 > Project: Beam > Issue Type: Task > Components: runner-dataflow, sdk-py >Reporter: Dmitry Demeshchuk >Priority: Minor > > It looks like right now custom filesystem classes have to be imported > explicitly: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30 > Seems like the current implementation doesn't allow discovering filesystems > that come from side packages, not from apache_beam itself. Even if I put a > custom FileSystem-inheriting class into a package and explicitly import it in > the root __init__.py of that package, it still doesn't make the class > discoverable. > The problems I'm experiencing happen on Dataflow runner, while Direct runner > works just fine. Here's an example of Dataflow output: > {code} > (320418708fe777d7): Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line > 581, in do_work > work_executor.execute() > File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", > line 166, in execute > op.start() > File > "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py", > line 54, in start > self.output(windowed_value) > File "dataflow_worker/operations.py", line 138, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5768) > def output(self, windowed_value, output_index=0): > File "dataflow_worker/operations.py", line 139, in > dataflow_worker.operations.Operation.output > (dataflow_worker/operations.c:5654) > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File "dataflow_worker/operations.py", line 72, in > dataflow_worker.operations.ConsumerSet.receive > (dataflow_worker/operations.c:3506) > cython.cast(Operation, consumer).process(windowed_value) > File "dataflow_worker/operations.py", line 328, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:11162) > with self.scoped_process_state: > File "dataflow_worker/operations.py", line 329, in > dataflow_worker.operations.DoOperation.process > (dataflow_worker/operations.c:6) > self.dofn_receiver.receive(o) > File "apache_beam/runners/common.py", line 382, in > apache_beam.runners.common.DoFnRunner.receive > (apache_beam/runners/common.c:10156) > self.process(windowed_value) > File "apache_beam/runners/common.py", line 390, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10458) > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 431, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > (apache_beam/runners/common.c:11673) > raise new_exn, None, original_traceback > File "apache_beam/runners/common.py", line 388, in > apache_beam.runners.common.DoFnRunner.process > (apache_beam/runners/common.c:10371) > self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 281, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > (apache_beam/runners/common.c:8626) > self._invoke_per_window(windowed_value) > File "apache_beam/runners/common.py", line 307, in > apache_beam.runners.common.PerWindowInvoker._invoke_per_window > (apache_beam/runners/common.c:9302) > windowed_value, self.process_method(*args_for_process)) > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py", > line 749, in > File > "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py", > line 891, in > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", > line 109, in _f >