Was there any indication in
the logs that the hadoop file system attempted to load but failed?
Nope, same message “No filesystem found for scheme hdfs” when HADOOP_CONF_DIR 
not set.

I guess I met the last problem. When I load input data from HDFS, the python 
sdk worker fails. It complains about  pipeline_options of hadoopfilesystem.py 
is empty. I thought that HDFS is only accessed by Flink and data is then 
serialized from JVM to python sdk worker, does the python sdk worker also needs 
to access HDFS?

Submission script
--------
python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt --output out 
--runner=PortableRunner --job_endpoint=localhost:8099 --environment_type 
PROCESS --environment_config "{\"command\":\"/opt/apache/beam/boot\"}" 
--hdfs_host 10.53.48.6 --hdfs_port 4008 --hdfs_user data

Error log
---------
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 3: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 157, in _execute
    response = task()
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 190, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 312, in do_instruction
    request.instruction_id)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 331, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 554, in process_bundle
    ].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 140, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 245, in 
apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 246, in 
apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 142, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 560, in 
apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 561, in 
apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 740, in 
apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 746, in 
apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 800, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 744, in 
apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
860, in split_source
    total_size = source.estimate_size()
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", 
line 137, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", 
line 193, in estimate_size
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
line 186, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
line 98, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/hadoopfilesystem.py", 
line 110, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 'read/Read/Split']


On 29 May 2019, at 3:44 PM, Robert Bradshaw 
<[email protected]<mailto:[email protected]>> wrote:

Glad you were able to figure it out!

Agree the error message was suboptimal. Was there any indication in
the logs that the hadoop file system attempted to load but failed?

On Wed, May 29, 2019 at 4:41 AM 青雉(祁明良) 
<[email protected]<mailto:[email protected]>> wrote:

Thanks guys, I got it. It was because Flink taskmanager docker missing 
HADOOP_CONF_DIR environment.
Maybe we could improve the error message in the future:)

Best,
Mingliang

On 29 May 2019, at 3:12 AM, Lukasz Cwik 
<[email protected]<mailto:[email protected]>> wrote:

Are you losing the META-INF/ ServiceLoader entries related to binding the 
FileSystem via the FileSystemRegistrar when building the uber jar[1]?
It does look like the Flink JobServer driver is registering the file systems[2].

1: 
https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806c859e6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java#L33
2: 
https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63

On Tue, May 28, 2019 at 11:39 AM 青雉(祁明良) 
<[email protected]<mailto:[email protected]>> wrote:

Yes, I did (2). Since the job server successfully
created the artifact directory, I think I did it correctly. And somehow this 
dependency is not submitted to task manager.
Maybe I can also try out (1), but to add additional jar to flink classpath 
sounds not a perfect solution.

获取 Outlook for iOS



On Wed, May 29, 2019 at 1:01 AM +0800, "Maximilian Michels" 
<[email protected]<mailto:[email protected]>> wrote:

Hi Mingliang,

Oh I see. You will also have to add the Jars to the TaskManager then.

You have these options:

1. Include them directly in the TaskManager classpath
2. Include them as dependencies to the JobServer, which will cause them
to be attached to Flink's JobGraph.

Do I understand correctly that you already did (2)?

Cheers,
Max

On 28.05.19 18:33, 青雉(祁明良) wrote:
Yes Max, I did add these Hadoop jars. The error
message from task manager was about  missing HDFS file system class from
beam-sdks-java-io-hadoop-file-system module, which I also shadowed into
job server.
I see the artifact directory is successfully created at HDFS by job
server, but fails at task manager when reading.

Best,
Mingliang

获取 Outlook for iOS



On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels"
wrote:

   Recent versions of Flink do not bundle Hadoop anymore, but they are
   still "Hadoop compatible". You just need to include the Hadoop jars in
   the classpath.

   Beams's Hadoop does not bundle Hadoop either, it just provides Beam file
   system abstractions which are similar to Flink "Hadoop compatibility".

   You probably want to add this to the job server:
       shadow library.java.hadoop_client
       shadow library.java.hadoop_common

   Cheers,
   Max

   On 28.05.19 15:41, 青雉(祁明良) wrote:
Thanks Robert, I had one, “qmlmoon”

Looks like I had the jobserver working now, I just add a shadow
dependency of /beam-sdks-java-io-hadoop-file-system/ to
/beam-runners-flink_2.11-job-server/ and rebuild the job server, but
Flink taskmanger also complains about the same issue during job running.

So how is Flink taskmanager finding this HDFS filesystem dependency?
-------
2019-05-28 13:15:57,695 INFO
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
- GetManifest for
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
2019-05-28 13:15:57,696 INFO
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
- Loading manifest for retrieval token
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
2019-05-28 13:15:57,698 INFO
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
- GetManifest for
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
failed
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80)
at
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


On 28 May 2019, at 9:31 PM, Robert Bradshaw  > > wrote: >> >> The easiest would 
probably be to create a project
   that depends on both >> the job server and the hadoop filesystem and
   then build that as a fat >> jar. > > > 本邮件及其附件含有小红书公司
   的保密信息,仅限于发送给以上收件人或群组。禁 > 止任何其他人以任何形
   式使用(包括但不限于全部或部分地泄露、复制、或散发) > 本邮件中的信
   息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 > 邮
   件! > This communication may contain privileged or other
   confidential > information of Red. If you have received it in error,
   please advise the > sender by reply e-mail and immediately delete
   the message and any > attachments without copying or disclosing the
   contents. Thank you. >


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁
止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)
本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本
邮件!
This communication may contain privileged or other confidential
information of Red. If you have received it in error, please advise the
sender by reply e-mail and immediately delete the message and any
attachments without copying or disclosing the contents. Thank you.



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.

Reply via email to