grpc cancelled without enough information

2019-06-12 Thread
Hi All,

I’m running a simple job decoding tfrecord with python sdk on Flink. It works 
with small input data from hdfs, and when I switch to large input data, it 
fails because of grpc cancelled. The error message makes it difficult to debug 
further. Any suggestions for the next steps?

Best,
Mingliang

---
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: 
cancelled before receiving half close
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asRuntimeException(Status.java:517)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:293)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:738)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-06-13 03:47:13,430 WARN  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService  - Beam Fn 
Logging client failed to be complete.
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: 
call already cancelled
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asRuntimeException(Status.java:517)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:356)
at 
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService.completeIfNotNull(GrpcLoggingService.java:78)
at 
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService.access$400(GrpcLoggingService.java:33)
at 
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService$InboundObserver.onError(GrpcLoggingService.java:105)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:269)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:293)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:738)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-06-13 03:47:13,430 ERROR 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer - Failed to 
handle for unknown endpoint
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: 
cancelled before receiving half close
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asRuntimeException(Status.java:517)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:272)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListene

Python sdk performance

2019-06-08 Thread
Hi all,

I’m currently tuning performance of python sdk with Flink runner. I found that 
the multithreading in python sdk worker limits the cpu usage around 1 core 
maximal. To my understanding, all the task slots on one taskmanger share one 
sdk process, which means the low cpu usage of python sdk may probably became 
the bottleneck. Is it possible to use multiprocessing to bump up cpu usage?

Best,
Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Question about --environment_type argument

2019-05-29 Thread
Was there any indication in
the logs that the hadoop file system attempted to load but failed?
Nope, same message “No filesystem found for scheme hdfs” when HADOOP_CONF_DIR 
not set.

I guess I met the last problem. When I load input data from HDFS, the python 
sdk worker fails. It complains about  pipeline_options of hadoopfilesystem.py 
is empty. I thought that HDFS is only accessed by Flink and data is then 
serialized from JVM to python sdk worker, does the python sdk worker also needs 
to access HDFS?

Submission script

python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt --output out 
--runner=PortableRunner --job_endpoint=localhost:8099 --environment_type 
PROCESS --environment_config "{\"command\":\"/opt/apache/beam/boot\"}" 
--hdfs_host 10.53.48.6 --hdfs_port 4008 --hdfs_user data

Error log
-
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 3: Traceback (most recent call last):
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 157, in _execute
response = task()
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 190, in 
self._execute(lambda: worker.do_instruction(work), work)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 312, in do_instruction
request.instruction_id)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py",
 line 331, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 554, in process_bundle
].process_encoded(data.data)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py",
 line 140, in process_encoded
self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 245, in 
apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 246, in 
apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 142, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 560, in 
apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 561, in 
apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 740, in 
apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
  File "apache_beam/runners/common.py", line 746, in 
apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 800, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 744, in 
apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 423, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
860, in split_source
total_size = source.estimate_size()
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/options/value_provider.py", 
line 137, in _f
return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", 
line 193, in estimate_size
match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
line 186, in match
filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", 
line 98, in get_filesystem
return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/hadoopfilesystem.py", 
line 110, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 'read/Read/Split']


On 29 May 2019, at 3:44 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

Glad you were able to figure it out!

Agree the error message was suboptimal. Was there any indicati

Re: Question about --environment_type argument

2019-05-28 Thread
Thanks guys, I got it. It was because Flink taskmanager docker missing 
HADOOP_CONF_DIR environment.
Maybe we could improve the error message in the future:)

Best,
Mingliang

On 29 May 2019, at 3:12 AM, Lukasz Cwik 
mailto:lc...@google.com>> wrote:

Are you losing the META-INF/ ServiceLoader entries related to binding the 
FileSystem via the FileSystemRegistrar when building the uber jar[1]?
It does look like the Flink JobServer driver is registering the file systems[2].

1: 
https://github.com/apache/beam/blob/95297dd82bd2fd3986900093cc1797c806c859e6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java#L33
2: 
https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63

On Tue, May 28, 2019 at 11:39 AM 青雉(祁明良) 
mailto:m...@xiaohongshu.com>> wrote:
Yes, I did (2). Since the job server successfully
created the artifact directory, I think I did it correctly. And somehow this 
dependency is not submitted to task manager.
Maybe I can also try out (1), but to add additional jar to flink classpath 
sounds not a perfect solution.

获取 Outlook for iOS<https://aka.ms/o0ukef>



On Wed, May 29, 2019 at 1:01 AM +0800, "Maximilian Michels" 
mailto:m...@apache.org>> wrote:


Hi Mingliang,

Oh I see. You will also have to add the Jars to the TaskManager then.

You have these options:

1. Include them directly in the TaskManager classpath
2. Include them as dependencies to the JobServer, which will cause them
to be attached to Flink's JobGraph.

Do I understand correctly that you already did (2)?

Cheers,
Max

On 28.05.19 18:33, 青雉(祁明良) wrote:
> Yes Max, I did add these Hadoop jars. The error
> message from task manager was about  missing HDFS file system class from
> beam-sdks-java-io-hadoop-file-system module, which I also shadowed into
> job server.
> I see the artifact directory is successfully created at HDFS by job
> server, but fails at task manager when reading.
>
> Best,
> Mingliang
>
> 获取 Outlook for iOS
>
>
>
> On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels"
> > wrote:
>
> Recent versions of Flink do not bundle Hadoop anymore, but they are
> still "Hadoop compatible". You just need to include the Hadoop jars in
> the classpath.
>
> Beams's Hadoop does not bundle Hadoop either, it just provides Beam file
> system abstractions which are similar to Flink "Hadoop compatibility".
>
> You probably want to add this to the job server:
> shadow library.java.hadoop_client
> shadow library.java.hadoop_common
>
> Cheers,
> Max
>
> On 28.05.19 15:41, 青雉(祁明良) wrote:
> > Thanks Robert, I had one, “qmlmoon”
> >
> > Looks like I had the jobserver working now, I just add a shadow
> > dependency of /beam-sdks-java-io-hadoop-file-system/ to
> > /beam-runners-flink_2.11-job-server/ and rebuild the job server, but
> > Flink taskmanger also complains about the same issue during job running.
> >
> > So how is Flink taskmanager finding this HDFS filesystem dependency?
> > ---
> > 2019-05-28 13:15:57,695 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - GetManifest for
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > 2019-05-28 13:15:57,696 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - Loading manifest for retrieval token
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > 2019-05-28 13:15:57,698 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - GetManifest for
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > failed
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
> > java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.goog

Re: Question about --environment_type argument

2019-05-28 Thread
Yes, I did (2). Since the job server successfully
created the artifact directory, I think I did it correctly. And somehow this 
dependency is not submitted to task manager.
Maybe I can also try out (1), but to add additional jar to flink classpath 
sounds not a perfect solution.

获取 Outlook for iOS<https://aka.ms/o0ukef>



On Wed, May 29, 2019 at 1:01 AM +0800, "Maximilian Michels" 
mailto:m...@apache.org>> wrote:


Hi Mingliang,

Oh I see. You will also have to add the Jars to the TaskManager then.

You have these options:

1. Include them directly in the TaskManager classpath
2. Include them as dependencies to the JobServer, which will cause them
to be attached to Flink's JobGraph.

Do I understand correctly that you already did (2)?

Cheers,
Max

On 28.05.19 18:33, 青雉(祁明良) wrote:
> Yes Max, I did add these Hadoop jars. The error
> message from task manager was about  missing HDFS file system class from
> beam-sdks-java-io-hadoop-file-system module, which I also shadowed into
> job server.
> I see the artifact directory is successfully created at HDFS by job
> server, but fails at task manager when reading.
>
> Best,
> Mingliang
>
> 获取 Outlook for iOS
>
>
>
> On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels"
> > wrote:
>
> Recent versions of Flink do not bundle Hadoop anymore, but they are
> still "Hadoop compatible". You just need to include the Hadoop jars in
> the classpath.
>
> Beams's Hadoop does not bundle Hadoop either, it just provides Beam file
> system abstractions which are similar to Flink "Hadoop compatibility".
>
> You probably want to add this to the job server:
>     shadow library.java.hadoop_client
> shadow library.java.hadoop_common
>
> Cheers,
> Max
>
> On 28.05.19 15:41, 青雉(祁明良) wrote:
> > Thanks Robert, I had one, “qmlmoon”
> >
> > Looks like I had the jobserver working now, I just add a shadow
> > dependency of /beam-sdks-java-io-hadoop-file-system/ to
> > /beam-runners-flink_2.11-job-server/ and rebuild the job server, but
> > Flink taskmanger also complains about the same issue during job running.
> >
> > So how is Flink taskmanager finding this HDFS filesystem dependency?
> > ---
> > 2019-05-28 13:15:57,695 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - GetManifest for
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > 2019-05-28 13:15:57,696 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - Loading manifest for retrieval token
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > 2019-05-28 13:15:57,698 INFO
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> > - GetManifest for
> > 
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> > failed
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
> > java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
> > at
> > 
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
> > at
> > 
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80)
> > at
> > 
> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298)
> > at
> > 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
> > at
> > 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> > at
> > 
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
>  

Re: Question about --environment_type argument

2019-05-28 Thread
Yes Max, I did add these Hadoop jars. The error
message from task manager was about  missing HDFS file system class from 
beam-sdks-java-io-hadoop-file-system module, which I also shadowed into job 
server.
I see the artifact directory is successfully created at HDFS by job server, but 
fails at task manager when reading.

Best,
Mingliang

获取 Outlook for iOS<https://aka.ms/o0ukef>



On Tue, May 28, 2019 at 11:47 PM +0800, "Maximilian Michels" 
mailto:m...@apache.org>> wrote:


Recent versions of Flink do not bundle Hadoop anymore, but they are
still "Hadoop compatible". You just need to include the Hadoop jars in
the classpath.

Beams's Hadoop does not bundle Hadoop either, it just provides Beam file
system abstractions which are similar to Flink "Hadoop compatibility".

You probably want to add this to the job server:
   shadow library.java.hadoop_client
   shadow library.java.hadoop_common

Cheers,
Max

On 28.05.19 15:41, 青雉(祁明良) wrote:
> Thanks Robert, I had one, “qmlmoon”
>
> Looks like I had the jobserver working now, I just add a shadow
> dependency of /beam-sdks-java-io-hadoop-file-system/ to
> /beam-runners-flink_2.11-job-server/ and rebuild the job server, but
> Flink taskmanger also complains about the same issue during job running.
>
> So how is Flink taskmanager finding this HDFS filesystem dependency?
> ---
> 2019-05-28 13:15:57,695 INFO
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> - GetManifest for
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> 2019-05-28 13:15:57,696 INFO
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> - Loading manifest for retrieval token
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> 2019-05-28 13:15:57,698 INFO
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
> - GetManifest for
> hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
> failed
> org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
> java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
> at
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80)
> at
> org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
>> On 28 May 2019, at 9:31 PM, Robert Bradshaw > > wrote:
>>
>> The easiest would probably be to create a project that depends on both
>> the job server and the hadoop filesystem and then build that as a fat
>> jar.
>
>
> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁
> 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)
> 本邮

Re: Question about --environment_type argument

2019-05-28 Thread
Thanks Robert, I had one, “qmlmoon”

Looks like I had the jobserver working now, I just add a shadow dependency of 
beam-sdks-java-io-hadoop-file-system to beam-runners-flink_2.11-job-server and 
rebuild the job server, but Flink taskmanger also complains about the same 
issue during job running.

So how is Flink taskmanager finding this HDFS filesystem dependency?
---
2019-05-28 13:15:57,695 INFO  
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
  - GetManifest for 
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
2019-05-28 13:15:57,696 INFO  
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
  - Loading manifest for retrieval token 
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
2019-05-28 13:15:57,698 INFO  
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
  - GetManifest for 
hdfs://myhdfs/algo-emr/k8s_flink/beam/job_87fa794e-9cd7-4c20-b95c-086f11abfaa4/MANIFEST
 failed
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.getManifest(BeamFileSystemArtifactRetrievalService.java:80)
at 
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:298)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


On 28 May 2019, at 9:31 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

The easiest would probably be to create a project that depends on both
the job server and the hadoop filesystem and then build that as a fat
jar.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Question about --environment_type argument

2019-05-28 Thread
https://github.com/apache/beam/pull/8700
Please help to create a JIRA and format the PR message.


Filesystems are registered using the java service provider interfaces.
Here the HDFS filesystem needs to be built into the job server (or at
least on the classpath when it's invoked).

I tried to put the jar file under classpath, but some basic hadoop dependency 
is missing.
Is there a simple way to built all depenpencies into 
beam-runners-flink_2.11-job-server distribution?

On 28 May 2019, at 7:45 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

Filesystems are registered using the java service provider interfaces.
Here the HDFS filesystem needs to be built into the job server (or at
least on the classpath when it's invoked).


本?件及其附件含有小??公司的保密信息,?限于?送?以上收件人或群?。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、?制、或散?)本?件中的信息。如果??收了本?件,??立即??或?件通知?件人并?除本?件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Re: Question about --environment_type argument

2019-05-28 Thread
I added some log to the beam code and found this. The error message is 
definitely much clear but swallowed here 
https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L229

Then is it actually not supported or I just missed some config?

---
java.lang.IllegalArgumentException: No filesystem found for scheme hdfs
at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.getJobDirResourceId(BeamFileSystemArtifactStagingService.java:180)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.getArtifactDirResourceId(BeamFileSystemArtifactStagingService.java:192)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.access$400(BeamFileSystemArtifactStagingService.java:69)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onNext(BeamFileSystemArtifactStagingService.java:219)
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onNext(BeamFileSystemArtifactStagingService.java:196)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


On 28 May 2019, at 5:01 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

IIRC, the default artifact directory is local, not HDFS, which would
of course not be readable on the workers.

Good point about missing hdfs parameters on the job server. Looks like
by default, it gets these from the environment?
https://github.com/apache/beam/blob/release-2.12.0/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L48
I'm actually not that familiar with HDFS, so maybe someone else can
chime in here. (But we should be throwing a better error than
NullPointer.)

On Tue, May 28, 2019 at 10:19 AM 青雉(祁明良)  wrote:

Yes, it is built from release-2.12.0 branch. There was an NPE message at 
BeamFileSystemArtifactStagingService.java:239, but it shows only at the first 
submission.

Plus, I wonder why there was --hdfs host / port / user argument for the python 
submission script, but not for the job server. If I let the artifact-dir be 
default, the following submission script will work fine (only at load data 
phase, the next phase will fail because of unfound artifact directory), which 
means hdfs can be accessed.

Submit script
-
python word_count.py --input hdfs://algo-emr/k8s_flink/LICENSE.txt --output out 
--runner=PortableRunner --job_endpoint=localhost:8099 --environment_type 
PROCESS --environment_config "{\"command\":\"/opt/apache/beam/boot\"}" 
--hdfs_host 10.53.48.6 --hdfs_port 4008 --hdfs_user data


Error Log:
---
./lib/beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server
 --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir 
hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on localhost:8098
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
Java ExpansionService started on localhost:8097
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
JobService started on localhost:8099
May 28, 2019 8:08:10 AM 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable@1

Re: Question about --environment_type argument

2019-05-28 Thread
readPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

On 28 May 2019, at 3:55 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

Thanks for the report. Is this with 2.12.0? If so,
https://github.com/apache/beam/blob/release-2.12.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java#L293
seems a strange place to get a NullPointerException. Is there perhaps
an exception earlier in the code (which could be the root cause)?

On Tue, May 28, 2019 at 4:52 AM 青雉(祁明良)  wrote:

Hi Robert,

When I set the —artifacts-dir to hdfs location, I got a NPE exception. The url 
is accessible via hadoop client.

---
./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server
 --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir 
hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on localhost:8098
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
Java ExpansionService started on localhost:8097
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
JobService started on localhost:8099
May 28, 2019 2:43:56 AM 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@44065193
java.lang.NullPointerException
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

On 27 May 2019, at 9:49 PM, Robert Bradshaw  wrote:

On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良)  wrote:


Just now I try to use the PROCESS environment type, the Flink taskmanager 
complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found 
this directory is only created on the machine with beam job endpoint. I guess 
maybe I should set the artifact-dir to a hdfs location, but no luck for me:(


Yes, you need to set your artifact staging directory (the
--artifacts-dir flag) to something visible to both the job server and
the workers. Did you try that?

I don’t know if the following error message from job endpoint is related when 
submitting the job.

Error from job endpoint:
---
[grpc-default-executor-0] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
Encountered Unexpected Exception for Invocation 
job_09aa2abd-0bc0-4994-a8b7-130156e4c13c
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io

Re: Question about --environment_type argument

2019-05-27 Thread
Hi Robert,

When I set the —artifacts-dir to hdfs location, I got a NPE exception. The url 
is accessible via hadoop client.

---
./beam-runners-flink_2.11-job-server-shadow-2.12.0-SNAPSHOT/bin/beam-runners-flink_2.11-job-server
 --flink-master-url test-mqi-job1-hl:8081 --artifacts-dir 
hdfs://10.53.48.6:4007/algo-emr/k8s_flink/beam/
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
ArtifactStagingService started on localhost:8098
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
Java ExpansionService started on localhost:8097
[main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - 
JobService started on localhost:8099
May 28, 2019 2:43:56 AM 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@44065193
java.lang.NullPointerException
at 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.onCompleted(BeamFileSystemArtifactStagingService.java:293)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:259)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

On 27 May 2019, at 9:49 PM, Robert Bradshaw 
mailto:rober...@google.com>> wrote:

On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) 
mailto:m...@xiaohongshu.com>> wrote:

Just now I try to use the PROCESS environment type, the Flink taskmanager 
complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found 
this directory is only created on the machine with beam job endpoint. I guess 
maybe I should set the artifact-dir to a hdfs location, but no luck for me:(

Yes, you need to set your artifact staging directory (the
--artifacts-dir flag) to something visible to both the job server and
the workers. Did you try that?

I don’t know if the following error message from job endpoint is related when 
submitting the job.

Error from job endpoint:
---
[grpc-default-executor-0] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
Encountered Unexpected Exception for Invocation 
job_09aa2abd-0bc0-4994-a8b7-130156e4c13c
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.gr

Re: Question about --environment_type argument

2019-05-27 Thread
Just now I try to use the PROCESS environment type, the Flink taskmanager 
complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found 
this directory is only created on the machine with beam job endpoint. I guess 
maybe I should set the artifact-dir to a hdfs location, but no luck for me:(

I don’t know if the following error message from job endpoint is related when 
submitting the job.

Error from job endpoint:
---
[grpc-default-executor-0] ERROR 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - 
Encountered Unexpected Exception for Invocation 
job_09aa2abd-0bc0-4994-a8b7-130156e4c13c
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusException: NOT_FOUND
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asException(Status.java:534)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getInvocation(InMemoryJobService.java:341)
at 
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.getStateStream(InMemoryJobService.java:262)
at 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc$MethodHandlers.invoke(JobServiceGrpc.java:770)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:707)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

On 27 May 2019, at 6:53 PM, Maximilian Michels 
mailto:m...@apache.org>> wrote:

Hi Mingliang,

The environment is created for each TaskManager.

   For docker, will it create one docker per flink taskmanager?

Yes.

   For process, does it mean start a python process to run the user code? And 
it seems "command" should be set in the environment config, but what should it 
be?

You will have to start the same Python SDK Harness which would run inside a 
Docker container if you had chosen Docker. This is a more manual approach which 
should only be chosen if you cannot use Docker.

   For external(loopback), does it mean let flink operator to call an external 
service and by default set to the place where I submit the beam job? This looks 
like all the data will be shift to a single machine and processed there.

This intended for a long-running SDK Harness which is already running when you 
run your pipeline. Thus, you only provide the address to the already running 
SDK Harness.

Cheers,
Max

On 26.05.19 13:51, 青雉(祁明良) wrote:
Hi All,
I'm currently trying python portable runner with Flink. I see there are 3 kinds 
of environment_type available "docker/process/external(loopback)" when submit a 
job. But I didn't find any material explain more.
1. For docker, will it create one docker per flink taskmanager?
2. For process, does it mean start a python process to run the user
   code? And it seems "command" should be set in the environment
   config, but what should it be?
3. For external(loopback), does it mean let flink operator to call an
   external service and by default set to the place where I submit the
   beam job? This looks like all the data will be shift to a single
   machine and processed there.
Thanks,
Mingliang
本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) 
本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the se

Re: Question about --environment_type argument

2019-05-27 Thread
Thanks max, it is clear to me now.

BTW, I would like to ask about the performance of python runner on Flink. As I 
remember, when Flink first introduce python support(maybe around 2015), it was 
5-10 slower than scala. For now, what is the performance difference of scala / 
python with Beam on Flink?
Since we would like to use tensorflow transform with Beam, python may probably 
be the better choice over JVM based language.

Cheers,
Mingliang

> On 27 May 2019, at 6:53 PM, Maximilian Michels  wrote:
>
> Hi Mingliang,
>
> The environment is created for each TaskManager.
>
>>For docker, will it create one docker per flink taskmanager?
>
> Yes.
>
>>For process, does it mean start a python process to run the user code? 
>> And it seems "command" should be set in the environment config, but what 
>> should it be?
>
> You will have to start the same Python SDK Harness which would run inside a 
> Docker container if you had chosen Docker. This is a more manual approach 
> which should only be chosen if you cannot use Docker.
>
>>For external(loopback), does it mean let flink operator to call an 
>> external service and by default set to the place where I submit the beam 
>> job? This looks like all the data will be shift to a single machine and 
>> processed there.
>
> This intended for a long-running SDK Harness which is already running when 
> you run your pipeline. Thus, you only provide the address to the already 
> running SDK Harness.
>
> Cheers,
> Max
>
> On 26.05.19 13:51, 青雉(祁明良) wrote:
>> Hi All,
>> I'm currently trying python portable runner with Flink. I see there are 3 
>> kinds of environment_type available "docker/process/external(loopback)" when 
>> submit a job. But I didn't find any material explain more.
>> 1. For docker, will it create one docker per flink taskmanager?
>> 2. For process, does it mean start a python process to run the user
>>code? And it seems "command" should be set in the environment
>>config, but what should it be?
>> 3. For external(loopback), does it mean let flink operator to call an
>>external service and by default set to the place where I submit the
>>beam job? This looks like all the data will be shift to a single
>>machine and processed there.
>> Thanks,
>> Mingliang
>> 本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁 止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发) 
>> 本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本 邮件!
>> This communication may contain privileged or other confidential information 
>> of Red. If you have received it in error, please advise the sender by reply 
>> e-mail and immediately delete the message and any attachments without 
>> copying or disclosing the contents. Thank you.


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


Question about --environment_type argument

2019-05-26 Thread
Hi All,


I'm currently trying python portable runner with Flink. I see there are 3 kinds 
of environment_type available "docker/process/external(loopback)" when submit a 
job. But I didn't find any material explain more.

  1.  For docker, will it create one docker per flink taskmanager?
  2.  For process, does it mean start a python process to run the user code? 
And it seems "command" should be set in the environment config, but what should 
it be?
  3.  For external(loopback), does it mean let flink operator to call an 
external service and by default set to the place where I submit the beam job? 
This looks like all the data will be shift to a single machine and processed 
there.


Thanks,

Mingliang

本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of 
Red. If you have received it in error, please advise the sender by reply e-mail 
and immediately delete the message and any attachments without copying or 
disclosing the contents. Thank you.


How to setup portable flink runner with remote flink cluster

2019-05-24 Thread
Hi all,

This is Mingliang, I followed the document to setup beam pipeline on local 
Flink cluster, but when I switch to remote Flink cluster, it doesn’t work 
directly and I can’t find  many materials talking about this.

Firstly, I don’t know if I have to install anything on the Flink jobmanager / 
taskmanager machine, actually I just setup the Flink cluster itself, let’s say 
Jobmanager on machine A and two taskmanager on machine B,C.

Next I installed apache_beam python package on machine D, and started the beam 
jobservice end point on machine D with: ./gradlew 
beam-runners-flink_2.11-job-server:runShadow -PflinkMasterUrl=A:8081

Then I submit the word_count example downloaded from beam to the jobservice end 
point on machine D with environment_type = LOOPBACK

The error message I received was attached below.

Any helps will be appreciated,
Thanks,
Mingliang

- Versions
Beam: release-2.12.0
Flink: 1.5.6

- Log

Caused by: java.lang.Exception: The user defined 'open()' method caused an 
exception: org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
... 1 more
Caused by: 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:186)
at 
org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:143)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
... 3 more
Caused by: org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:222)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:203)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:132)
at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.notifyRunnerAvailable(BeamFnExternalWorkerPoolGrpc.java:152)
at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:109)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:161)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
... 13 more
Caused by: 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:45955
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoo