[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-11 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082872#comment-16082872
 ] 

Ahmet Altay commented on BEAM-2573:
---

Thank you [~demeshchuk]. I agree with documenting this feature. Although it 
should be documented post release in order to not confuse 2.0.0 users.

cc: [~melap]

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 99, in split
> filesys

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-11 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082850#comment-16082850
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Finally, after some fighting dependency issues, made it work, confirmed that it 
works on master as expected! Thanks a lot for all the help, folks!

I guess, since {{beam_plugins}} are the only sane solution to this problem 
overall, we can just make sure to reflect that in 
https://beam.apache.org/documentation/io/authoring-overview/ and call it a day?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081179#comment-16081179
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

[~altay]: I was doing exactly the option 2, yes.

My guess is that there are some cached dependency issues, I've stumbled upon 
cached dependencies before. Will try to clean the virtualenv, install 
everything from scratch and see how it goes.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081160#comment-16081160
 ] 

Ahmet Altay commented on BEAM-2573:
---

[~demeshchuk], I belive you are mixing two SDK versions. The SDK version 
installed in your virtual environment should exactly match the sdk version you 
use with --sdk_location flag.

You have two options:
1. Use the released version. pip install the latest version and do not set the 
--sdk_location flag. You will not have the new features that will be available 
in the next version.
2. Use the head version. Build and sdk from head. Install that in your new 
virtual environment and also use --sdk_location flag to point to that sdk. You 
will have all the new features available at head.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-package

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081145#comment-16081145
 ] 

Sourabh Bajaj commented on BEAM-2573:
-

I think the worker is specified in 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dataflow/internal/dependency.py#L81
 so it is related to the version of the sdk being used.

I might be wrong about needing the release if you're on master as I can run  
the following command and see logs about failing to import the dataflow package 
on head and success for the builtin plugins.

{code:shell}
python -m apache_beam.examples.wordcount --output $BUCKET/wc/output --project 
$PROJECT --staging_location $BUCKET/wc/staging --temp_location $BUCKET/wc/tmp  
--job_name "sourabhbajaj-wc-4" --runner DataflowRunner --sdk_location 
dist/apache-beam-2.1.0.dev0.tar.gz  --beam_plugin dataflow.s3.aws.S3FS
{code}


> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_b

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080997#comment-16080997
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

[~sb2nov] Thanks for the explanation! I'll wait till the 2.1.0 release, then 
(or maybe release candidates would be good enough too).

By the way, am I right to assume that full Dataflow compatibility is only 
guaranteed for tagged Beam versions, then?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/u

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080974#comment-16080974
 ] 

Sourabh Bajaj commented on BEAM-2573:
-

I think the plugins are working correctly if they are passing a list of class 
names to be imported at the start. You might need to wait for the next release 
as this required a change to the dataflow workers as they need to start 
importing the paths specified in the beam-plugins list. There is a release 
going on right now so that might happen in the next few days itself.

I am not sure about the crash loop in windmillio  [~charleschen] might know 
more.



> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
> 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080933#comment-16080933
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Hmm, looks like it was indeed a dependency issue which setuptools was hiding.

I did `pip install .` from the beam repo, and the plugins made it in there.

Interestingly, Beam seems to have automatically converted the plugins list into 
a list of classes. In the Dataflow job's options, {{beam_plugins}} equals to 
{{['apache_beam.io.filesystem.FileSystem', 
'apache_beam.io.localfilesystem.LocalFileSystem', 
'apache_beam.io.gcp.gcsfilesystem.GCSFileSystem', 
'dataflow.aws.s3.S3FileSystem']}}.

Interestingly, the crash loop described in my previous comment still happens. 
I'll continue looking at the sources to see where I screwed up.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080889#comment-16080889
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Hmm, now it's getting a bit past that, but my pipeline enters an infinite crash 
loop backoff. This is what Stackdriver prints out:
{code}
IFile "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
line 26, in  
I   
I  from dataflow_worker import windmillio 
IFile 
"/usr/local/lib/python2.7/dist-packages/dataflow_worker/windmillio.py", line 
41, in  
I  class PubSubWindmillSource(pubsub.PubSubSource): 
I  :  
I  'module' object has no attribute 'PubSubSource' 
I  checking backoff for container "python" in pod 
"dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" 
E  Error syncing pod e6169e8537b1bd83321865dafc047ba4, skipping: failed to 
"StartContainer" for "python" with CrashLoopBackOff: "Back-off 5m0s restarting 
failed container=python 
pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4)"
 
I  Setting node annotation to enable volume controller attach/detach 
I  checking backoff for container "python" in pod 
"dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8" 
I  Back-off 5m0s restarting failed container=python 
pod=dataflow-s3-wordcount-example-2-07101139-0c31-harness-3tf8_default(e6169e8537b1bd83321865dafc047ba4)
 
{code}

I'll try to clean up the local dependencies and see if that helps.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runn

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-10 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080867#comment-16080867
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

So, I tried to switch to the head version of the SDK and use the 
{{beam_plugins}} option.

Here are the steps I performed:

1. Did {{python setup.py sdist}} in the master branch of Beam. Also, did 
{{python setup.py install}}.
2. Created the following dictionary in Python (note the {{beam_plugins}} option 
present):
{code}
{'job_name': 's3-wordcount-example-2', 'staging_location': 
'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/staging_location', 
'runner': 'dataflow', 'streaming': False, 'runtime_type_check': False, 
'temp_location': 
'gs://dataflow-test-gc-project-168517/s3-wordcount-example-2/temporary_location',
 'setup_file': '/tmp/tmpEdRIo2/setup.py', 'dataflow_endpoint': 
'https://dataflow.googleapis.com', 'sdk_location': 
'/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz', 
'save_main_session': True, 'zone': 'us-west1-a', 'region': 'us-west1', 
'profile_cpu': False, 'bucket': 'gs://dataflow-test-gc-project-168517', 
'profile_memory': False, 'pipeline_type_check': True, 'project': 
'test-gc-project-168517', 'direct_runner_use_stacked_bundle': True, 
'type_check_strictness': 'DEFAULT_TO_ANY', 'beam_plugins': ' dataflow', 
'no_auth': False}
{code}
3. Created a {{PipelineOptions}} object and used it inside a pipeline: 
{{options = PipelineOptions.from_dictionary(options_dict)}}
4. Ran the pipeline in Dataflow.

Now, in the Dataflow UI I'm seeing some of the pipeline options (for example, 
{{sdk_location}} is correct), however I'm not seeing {{beam_plugins}} anywhere. 
FWIW, the job's "Dataflow SDK version" equals to "Google Cloud Dataflow SDK for 
Python 2.0.0", but {{sdk_location}} equals to 
{{/Users/dmitrydemeshchuk/beam/sdks/python/dist/apache-beam-2.2.0.dev0.tar.gz}} 
(note the 2.2.0 version).

Needless to say, the `beam_plugins` option doesn't seem to get applied to my 
pipeline, at least it fails as if the plugin wasn't imported.

I'm almost sure this has something to do with the Dataflow SDK version, but so 
far cannot find a way to make it right. Any suggestions?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.proc

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-09 Thread Sourabh Bajaj (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079848#comment-16079848
 ] 

Sourabh Bajaj commented on BEAM-2573:
-

[~chamikara]
The issue with the register approach is that it needs to run in the python 
process of the worker so something from pipeline construction needs to tell the 
worker process to import these files and we use beam plugins as a transport 
mechanism for that. 
If you're doing register in the pipeline construction you'll have to import the 
class anyway so that kind of makes the registration pointless as the import 
itself would make the FS available to the subclasses module.

[~demeshchuk] Can you try importing dataflow in your pipeline submission 
process and see if that makes the plugin available. You can just look at the 
pipeline options to see if it worked correctly as it should try to pass the 
full S3FS path in the --beam-plugins option.

Ideally I think the best way to do this would have been changing the Path to be 
FS specific and the ReadFromText would take in a specialized S3Path object but 
that is a really large breaking change so we have to live with this.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/minicon

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078995#comment-16078995
 ] 

Ahmet Altay commented on BEAM-2573:
---

cc: [~robertwb]

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 99, in split
> filesystem = FileSystems.get_filesystem(path)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 61, in get_file

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078949#comment-16078949
 ] 

Chamikara Jayalath commented on BEAM-2573:
--

Right. Usually you should not have to import FileSystem implementations during 
pipeline construction and importing just for invoking the side affect of being 
included in beam_plugin has downsides (lint errors in user program, non-obvious 
api, relying on runner specific code). It might be better to have a more direct 
API for registering FileSystems.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, i

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078929#comment-16078929
 ] 

Ahmet Altay commented on BEAM-2573:
---

There is already a mechanism to register all plugins that was imported 
(https://github.com/apache/beam/blob/dba5140792969182b85e449eb2f5630bd45710ca/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L242).
 

Would {{FileSystems.register()}} be for not imported plugins?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078916#comment-16078916
 ] 

Chamikara Jayalath commented on BEAM-2573:
--

Will it make sense to add a FileSystems.register() method that adds an entry to 
--beam_plugin ?

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", line 
> 99, in split
> filesystem = FileSystems.get_filesystem(path)
>   File 
> 

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078890#comment-16078890
 ] 

Ahmet Altay commented on BEAM-2573:
---

{{--beam_plugin}} flag allows importing arbitrary modules before, and it could 
be used using other file systems. Before choosing this, [~sb2nov] investigated 
other method for auto-discovery and all of them had short-comings.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Affects Versions: 2.0.0
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
> return fnc(self, *args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 146, in initialize_write
> tmp_dir = self._create_temp_dir(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsink.py", 
> line 151, in _create_temp_dir
> base_path, last_component = FileSystems.split(file_path_prefix)
>   File 
> "/usr/local/lib/python2.7/dist-packages/a

[jira] [Commented] (BEAM-2573) Better filesystem discovery mechanism in Python SDK

2017-07-07 Thread Dmitry Demeshchuk (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078889#comment-16078889
 ] 

Dmitry Demeshchuk commented on BEAM-2573:
-

Here's a quick example of the file being picked up locally:

{code}
In [1]: import dataflow
No handlers could be found for logger "oauth2client.contrib.multistore_file"

In [2]: from apache_beam.io.filesystem import FileSystem

In [3]: FileSystem.get_all_subclasses()
Out[3]:
[apache_beam.io.localfilesystem.LocalFileSystem,
 apache_beam.io.gcp.gcsfilesystem.GCSFileSystem,
 dataflow.aws.s3.S3FileSystem]
{code}

dataflow is our internal Python package that we use for more robust interaction 
with Dataflow and custom sources/sinks/transforms. It definitely gets loaded at 
the end, because other pipelines that use it work just fine, and it's only the 
filesystems that can't pick up its classes.

> Better filesystem discovery mechanism in Python SDK
> ---
>
> Key: BEAM-2573
> URL: https://issues.apache.org/jira/browse/BEAM-2573
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow, sdk-py
>Reporter: Dmitry Demeshchuk
>Priority: Minor
>
> It looks like right now custom filesystem classes have to be imported 
> explicitly: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L30
> Seems like the current implementation doesn't allow discovering filesystems 
> that come from side packages, not from apache_beam itself. Even if I put a 
> custom FileSystem-inheriting class into a package and explicitly import it in 
> the root __init__.py of that package, it still doesn't make the class 
> discoverable.
> The problems I'm experiencing happen on Dataflow runner, while Direct runner 
> works just fine. Here's an example of Dataflow output:
> {code}
>   (320418708fe777d7): Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 
> 581, in do_work
> work_executor.execute()
>   File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", 
> line 166, in execute
> op.start()
>   File 
> "/usr/local/lib/python2.7/dist-packages/dataflow_worker/native_operations.py",
>  line 54, in start
> self.output(windowed_value)
>   File "dataflow_worker/operations.py", line 138, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5768)
> def output(self, windowed_value, output_index=0):
>   File "dataflow_worker/operations.py", line 139, in 
> dataflow_worker.operations.Operation.output 
> (dataflow_worker/operations.c:5654)
> cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "dataflow_worker/operations.py", line 72, in 
> dataflow_worker.operations.ConsumerSet.receive 
> (dataflow_worker/operations.c:3506)
> cython.cast(Operation, consumer).process(windowed_value)
>   File "dataflow_worker/operations.py", line 328, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:11162)
> with self.scoped_process_state:
>   File "dataflow_worker/operations.py", line 329, in 
> dataflow_worker.operations.DoOperation.process 
> (dataflow_worker/operations.c:6)
> self.dofn_receiver.receive(o)
>   File "apache_beam/runners/common.py", line 382, in 
> apache_beam.runners.common.DoFnRunner.receive 
> (apache_beam/runners/common.c:10156)
> self.process(windowed_value)
>   File "apache_beam/runners/common.py", line 390, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10458)
> self._reraise_augmented(exn)
>   File "apache_beam/runners/common.py", line 431, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented 
> (apache_beam/runners/common.c:11673)
> raise new_exn, None, original_traceback
>   File "apache_beam/runners/common.py", line 388, in 
> apache_beam.runners.common.DoFnRunner.process 
> (apache_beam/runners/common.c:10371)
> self.do_fn_invoker.invoke_process(windowed_value)
>   File "apache_beam/runners/common.py", line 281, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process 
> (apache_beam/runners/common.c:8626)
> self._invoke_per_window(windowed_value)
>   File "apache_beam/runners/common.py", line 307, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_per_window 
> (apache_beam/runners/common.c:9302)
> windowed_value, self.process_method(*args_for_process))
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/transforms/core.py",
>  line 749, in 
>   File 
> "/Users/dmitrydemeshchuk/miniconda2/lib/python2.7/site-packages/apache_beam/io/iobase.py",
>  line 891, in 
>   File 
> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>  line 109, in _f
>