Hi Kyle,
 
This is the pipeleine options config (I replaced localhost with actual job server's IP address, and still receive the same error. Do workers need to talk to job server independent from spark executors?):
 
options = PipelineOptions([
        "--runner=PortableRunner",
        "--job_endpoint=%s:8099" % ip_address,
        "--environment_type=PROCESS",
        "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
        ""
    ])
 
 
 
Sent: Thursday, December 12, 2019 at 5:30 PM
From: "Kyle Weaver" <kcwea...@google.com>
To: dev <dev@beam.apache.org>
Subject: Re: Beam's job crashes on cluster
Can you share the pipeline options you are using? Particularly environment_type and environment_config.
 
On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <softm...@gmx.com> wrote:
Running Beam on Spark cluster, it crashhes and I get the following error (workers are on separate nodes, it works fine when workers are on the same node as runner):
 
> Task :runners:spark:job-server:runShadow FAILED
Exception in thread wait_until_finish_read:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 411, in read_messages
    for message in self._message_stream:
  File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next
    return self._next()
  File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next
    raise self
_Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = "{"created":"@1576190515.361076583","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
>
Traceback (most recent call last):
  File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
    stats = tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, pipeline_options=options)
  File "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", line 197, in generate_statistics_from_csv
    statistics_pb2.DatasetFeatureStatisticsList)))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, in __exit__
    self.run().wait_until_finish()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 429, in wait_until_finish
    for state_response in self._state_stream:
  File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next
    return self._next()
  File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = "{"created":"@1576190515.361053677","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"

Reply via email to