Hi Kyle,
This is the pipeleine options config (I replaced localhost with actual job server's IP address, and still receive the same error. Do workers need to talk to job server independent from spark executors?):
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=%s:8099" % ip_address,
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
""
])
"--runner=PortableRunner",
"--job_endpoint=%s:8099" % ip_address,
"--environment_type=PROCESS",
"--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
""
])
Sent: Thursday, December 12, 2019 at 5:30 PM
From: "Kyle Weaver" <[email protected]>
To: dev <[email protected]>
Subject: Re: Beam's job crashes on cluster
From: "Kyle Weaver" <[email protected]>
To: dev <[email protected]>
Subject: Re: Beam's job crashes on cluster
Can you share the pipeline options you are using? Particularly environment_type and environment_config.
On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote:
Running Beam on Spark cluster, it crashhes and I get the following error (workers are on separate nodes, it works fine when workers are on the same node as runner):> Task :runners:spark:job-server:runShadow FAILED
Exception in thread wait_until_finish_read:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 411, in read_messages
for message in self._message_stream:
File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next
return self._next()
File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next
raise self
_Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "{"created":"@1576190515.361076583","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
>Traceback (most recent call last):
File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
stats = tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, pipeline_options=options)
File "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", line 197, in generate_statistics_from_csv
statistics_pb2.DatasetFeatureStatisticsList)))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, in __exit__
self.run().wait_until_finish()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 429, in wait_until_finish
for state_response in self._state_stream:
File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 395, in next
return self._next()
File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line 561, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "{"created":"@1576190515.361053677","description":"Error received from peer ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket closed","grpc_status":14}"
