I believe FileSystems are initialized for Python SDK here:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L98

For this to work, runner, at initialization, should set the pipeline
options object, for example by invoking following function.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py#L68

Is it possible that this is not being set when using Python SDK on portable
Flink ?

On Wed, May 29, 2019 at 5:11 AM Robert Bradshaw <rober...@google.com> wrote:

> I was wondering if there was an error earlier in the logs, e.g. at
> startup, about this missing parameter (given that it was linked in, I'd
> assume it at least tried to load).
>
> As for the other question, if the Python worker is doing the read, then
> yes, it needs access to HDFS as well.
>
> On Wed, May 29, 2019 at 1:54 PM 青雉(祁明良) <m...@xiaohongshu.com> wrote:
>
>> Was there any indication in
>> the logs that the hadoop file system attempted to load but failed?
>>
>> Nope, same message “No filesystem found for scheme hdfs” when
>> HADOOP_CONF_DIR not set.
>>
>> I guess I met the last problem. When I load input data from HDFS, the
>> python sdk worker fails. It complains about  pipeline_options of
>> hadoopfilesystem.py is empty. I thought that HDFS is only accessed by Flink
>> and data is then serialized from JVM to python sdk worker, does the python
>> sdk worker also needs to access HDFS?
>>
>> Submission script
>> --------
>> python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt
>> --output out --runner=PortableRunner --job_endpoint=localhost:8099
>> --environment_type PROCESS --environment_config
>> "{\"command\":\"/opt/apache/beam/boot\"}" --hdfs_host 10.53.48.6
>> --hdfs_port 4008 --hdfs_user data
>>
>> Error log
>> ---------
>> Caused by: java.lang.RuntimeException: Error received from SDK harness
>> for instruction 3: Traceback (most recent call last):
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 157, in _execute
>>     response = task()
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 190, in <lambda>
>>     self._execute(lambda: worker.do_instruction(work), work)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 312, in do_instruction
>>     request.instruction_id)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 331, in process_bundle
>>     bundle_processor.process_bundle(instruction_id))
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 554, in process_bundle
>>     ].process_encoded(data.data)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 140, in process_encoded
>>     self.output(decoded_value)
>>   File "apache_beam/runners/worker/operations.py", line 245, in
>> apache_beam.runners.worker.operations.Operation.output
>>     def output(self, windowed_value, output_index=0):
>>   File "apache_beam/runners/worker/operations.py", line 246, in
>> apache_beam.runners.worker.operations.Operation.output
>>     cython.cast(Receiver,
>> self.receivers[output_index]).receive(windowed_value)
>>   File "apache_beam/runners/worker/operations.py", line 142, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>     self.consumer.process(windowed_value)
>>   File "apache_beam/runners/worker/operations.py", line 560, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>     with self.scoped_process_state:
>>   File "apache_beam/runners/worker/operations.py", line 561, in
>> apache_beam.runners.worker.operations.DoOperation.process
>>     delayed_application = self.dofn_receiver.receive(o)
>>   File "apache_beam/runners/common.py", line 740, in
>> apache_beam.runners.common.DoFnRunner.receive
>>     self.process(windowed_value)
>>   File "apache_beam/runners/common.py", line 746, in
>> apache_beam.runners.common.DoFnRunner.process
>>     self._reraise_augmented(exn)
>>   File "apache_beam/runners/common.py", line 800, in
>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>     raise_with_traceback(new_exn)
>>   File "apache_beam/runners/common.py", line 744, in
>> apache_beam.runners.common.DoFnRunner.process
>>     return self.do_fn_invoker.invoke_process(windowed_value)
>>   File "apache_beam/runners/common.py", line 423, in
>> apache_beam.runners.common.SimpleInvoker.invoke_process
>>     windowed_value, self.process_method(windowed_value.value))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py",
>> line 860, in split_source
>>     total_size = source.estimate_size()
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py",
>> line 137, in _f
>>     return fnc(self, *args, **kwargs)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py",
>> line 193, in estimate_size
>>     match_result = FileSystems.match([pattern])[0]
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>> line 186, in match
>>     filesystem = FileSystems.get_filesystem(patterns[0])
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py",
>> line 98, in get_filesystem
>>     return systems[0](pipeline_options=options)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/io/hadoopfilesystem.py",
>> line 110, in __init__
>>     raise ValueError('pipeline_options is not set')
>> ValueError: pipeline_options is not set [while running 'read/Read/Split']
>>
>>
>> On 29 May 2019, at 3:44 PM, Robert Bradshaw <rober...@google.com> wrote:
>>
>> Glad you were able to figure it out!
>>
>> Agree the error message was suboptimal. Was there any indication in
>> the logs that the hadoop file system attempted to load but failed?
>>
>> On Wed, May 29, 2019 at 4:41 AM 青雉(祁明良) <m...@xiaohongshu.com> wrote:
>>
>>
>> Thanks guys, I got it. It was because Flink taskmanager docker missing
>> HADOOP_CONF_DIR environment.
>> Maybe we could improve the error message in the future:)
>>
>> Best,
>> Mingliang
>>
>> On 29 May 2019, at 3:12 AM, Lukasz Cwik <lc...@google.com> wrote:
>>
>> Are you losing the META-INF/ ServiceLoader entries related to binding the
>> FileSystem via the FileSystemRegistrar when building the uber jar[1]?
>> It does look like the Flink JobServer driver is registering the file
>> systems[2].
>>
>> 1:
>> https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806c859e6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java#L33
>> 2:
>> https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63
>>
>> On Tue, May 28, 2019 at 11:39 AM 青雉(祁明良) <m...@xiaohongshu.com> wrote:
>>
>>
>> Yes, I did (2). Since the job server successfully
>> created the artifact directory, I think I did it correctly. And somehow
>> this dependency is not submitted to task manager.
>> Maybe I can also try out (1), but to add additional jar to flink
>> classpath sounds not a perfect solution.
>>
>> 获取 Outlook for iOS
>>
>>
>>
>> On Wed, May 29, 2019 at 1:01 AM +0800, "Maximilian Michels" <
>> m...@apache.org> wrote:
>>
>> Hi Mingliang,
>>
>> Oh I see. You will also have to add the Jars to the TaskManager then.
>>
>> You have these options:
>>
>> 1. Include them directly in the TaskManager classpath
>> 2. Include them as dependencies to the JobServer, which will cause them
>> to be attached to Flink's JobGraph.
>>
>> Do I understand correctly that you already did (2)?
>>
>> Cheers,
>> Max
>>
>> On 28.05.19 18:33, 青雉(祁明良) wrote:
>>
>> Yes Max, I did add these Hadoop jars. The error
>> message from task manager was about  missing HDFS file system class from
>> beam-sdks-java-io-hadoop-file-system module, which I also shadowed into
>> job server.
>> I see the artifact directory is successfully created at HDFS by job
>> server, but fails at task manager when reading.
>>
>> Best,
>> Mingliang
>>
>> 获取 Outlook for iOS
>>
>>
>>
>> On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels"
>>
>> wrote:
>>
>>
>>    Recent versions of Flink do not bundle Hadoop anymore, but they are
>>    still "Hadoop compatible". You just need to include the Hadoop jars in
>>    the classpath.
>>
>>    Beams's Hadoop does not bundle Hadoop either, it just provides Beam
>> file
>>    system abstractions which are similar to Flink "Hadoop compatibility".
>>
>>    You probably want to add this to the job server:
>>        shadow library.java.hadoop_client
>>        shadow library.java.hadoop_common
>>
>>    Cheers,
>>    Max
>>
>>    On 28.05.19 15:41, 青雉(祁明良) wrote:
>>
>> Thanks Robert, I had one, “qmlmoon”
>>
>> Looks like I had the jobserver working now, I just add a shadow
>> dependency of /beam-sdks-java-io-hadoop-file-system/ to
>> /beam-runners-flink_2.11-job-server/ and rebuild the job server, but
>> Flink taskmanger also complains about the same issue during job running.
>>
>> So how is Flink taskmanager finding this HDFS filesystem dependency?
>> -------
>> 2019-05-28 13:15:57,695 INFO
>>
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>> - GetManifest for
>>
>> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
>> 2019-05-28 13:15:57,696 INFO
>>
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>> - Loading manifest for retrieval token
>>
>> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
>> 2019-05-28 13:15:57,698 INFO
>>
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>> - GetManifest for
>>
>> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
>> failed
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
>> java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
>> at
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
>> at
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
>> at
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
>> at
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
>> at
>>
>> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80)
>> at
>>
>> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> at
>>
>> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>> On 28 May 2019, at 9:31 PM, Robert Bradshaw  > > wrote: >> >> The easiest
>> would probably be to create a project
>>
>>    that depends on both >> the job server and the hadoop filesystem and
>>    then build that as a fat >> jar. > > > 本邮件及其附件含有小红书公司
>>    的保密信息,仅限于发送给以上收件人或群组。禁 > 止任何其他人以任何形
>>    式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信
>>    息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 > 邮
>>    件! > This communication may contain privileged or other
>>    confidential > information of Red. If you have received it in error,
>>    please advise the > sender by reply e-mail and immediately delete
>>    the message and any > attachments without copying or disclosing the
>>    contents. Thank you. >
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁
>> 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)
>> 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本
>> 邮件!
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>
>>
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
>>
>> This communication may contain privileged or other confidential
>> information of Red. If you have received it in error, please advise the
>> sender by reply e-mail and immediately delete the message and any
>> attachments without copying or disclosing the contents. Thank you.
>>
>>

Reply via email to