> Do workers need to talk to job server independent from spark executors?

No, they don't.

>From the time stamps in your logs, it looks like the sigbus happened after
the executor was lost.

Some additional info that might help us establish a chain of causation:
- the arguments you used to start the job server?
- the spark cluster deployment setup?

On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote:

> Actually the reason for that error is Job Server/JRE crashes at final
> stages and service becomes unavailable (note: job is on a very small
> dataset that is the absence of cluster, will be done in a couple of
> seconds):
>
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4
> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
> sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB,
> free: 967.8 MB)
> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
> 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB)
> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
> 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB)
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294
> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37
> <============-> 98% EXECUTING [2m 26s]
> > IDLE
> > IDLE
> > IDLE
> > :runners:spark:job-server:runShadow
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, tid=0x00007f5abb886700
> #
> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build
> 1.8.0_232-b09)
> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64
> compressed oops)
> # Problematic frame:
> # V  [libjvm.so+0x8f8d5e]  PerfLongVariant::sample()+0x1e
> #
> # Core dump written. Default location: /opt/spark/beam/core or core.825
> #
> # An error report file with more information is saved as:
> # /opt/spark/beam/hs_err_pid825.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
> Aborted (core dumped)
>
>
> From /opt/spark/beam/hs_err_pid825.log:
>
> Internal exceptions (10
> events):
>
> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
> 605]
>
> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a
> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder>
> (0x0000000794f69e70) thrown at
> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp,
> line 199]
>
>
> Looking at the logs when running the script, I can see exectors become
> lost, but not sure if that might be related to the crash of the job server:
>
> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID
> 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055
> bytes)
> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in
> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3
> GB)
> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0 (TID
> 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779
> bytes)
> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
> 13) in 37 ms on 192.168.118.75 (executor 1)
> (1/4)
>
> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 8 to 192.168.118.75:49158
>
> 19/12/13 15:07:30 INFO
> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling
> executor
> 2.
>
> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4)
>
> Which result in losing shuffle files, and the following exception:
>
> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2
> (epoch
> 4)
>
> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID
> 15, 192.168.118.75, executor 1, partition 1, ANY, 7670
> bytes)
>
> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0 (TID
> 14) in 3436 ms on 192.168.118.75 (executor 1)
> (2/4)
>
> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in
> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3
> GB)
>
> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send map
> output locations for shuffle 7 to 192.168.118.75:49158
>
> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID
> 16, 192.168.118.75, executor 1, partition 0, ANY, 7670
> bytes)
>
> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID 15,
> 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1,
> reduceId=1,
> message=
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle
> 7
>
>         at
> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
>
>         at
> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
>
>         at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>
>         at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>
>         at
> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
>
>         at
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
>
>         at
> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
>
>         at
> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>
>         at
> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>
>         at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>
>         at
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>
>         at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>
>         at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>
>         at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>
>         at
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>
>         at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
>         at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>
>         at
> org.apache.spark.scheduler.Task.run(Task.scala:123)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>         at
> java.lang.Thread.run(Thread.java:748)
>
>
>
> *Sent:* Friday, December 13, 2019 at 6:58 AM
> *From:* "Matthew K." <[email protected]>
> *To:* [email protected]
> *Cc:* dev <[email protected]>
>
> *Subject:* Re: Beam's job crashes on cluster
> Hi Kyle,
>
> This is the pipeleine options config (I replaced localhost with actual job
> server's IP address, and still receive the same error. Do workers need to
> talk to job server independent from spark executors?):
>
> options = PipelineOptions([
>         "--runner=PortableRunner",
>         "--job_endpoint=%s:8099" % ip_address,
>         "--environment_type=PROCESS",
>
> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
>         ""
>     ])
>
>
>
> *Sent:* Thursday, December 12, 2019 at 5:30 PM
> *From:* "Kyle Weaver" <[email protected]>
> *To:* dev <[email protected]>
> *Subject:* Re: Beam's job crashes on cluster
> Can you share the pipeline options you are using?
> Particularly environment_type and environment_config.
>
> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote:
>
>> Running Beam on Spark cluster, it crashhes and I get the following error
>> (workers are on separate nodes, it works fine when workers are on the same
>> node as runner):
>>
>> > Task :runners:spark:job-server:runShadow FAILED
>> Exception in thread wait_until_finish_read:
>> Traceback (most recent call last):
>>   File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
>>     self.run()
>>   File "/usr/lib/python2.7/threading.py", line 754, in run
>>     self.__target(*self.__args, **self.__kwargs)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>> line 411, in read_messages
>>     for message in self._message_stream:
>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>> 395, in next
>>     return self._next()
>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>> 561, in _next
>>     raise self
>> _Rendezvous: <_Rendezvous of RPC that terminated with:
>>         status = StatusCode.UNAVAILABLE
>>         details = "Socket closed"
>>         debug_error_string =
>> "{"created":"@1576190515.361076583","description":"Error received from peer
>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>> closed","grpc_status":14}"
>> >
>> Traceback (most recent call last):
>>   File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
>>     stats =
>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION,
>> pipeline_options=options)
>>   File
>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py",
>> line 197, in generate_statistics_from_csv
>>     statistics_pb2.DatasetFeatureStatisticsList)))
>>   File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py",
>> line 427, in __exit__
>>     self.run().wait_until_finish()
>>   File
>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>> line 429, in wait_until_finish
>>     for state_response in self._state_stream:
>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>> 395, in next
>>     return self._next()
>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>> 561, in _next
>>     raise self
>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
>>         status = StatusCode.UNAVAILABLE
>>         details = "Socket closed"
>>         debug_error_string =
>> "{"created":"@1576190515.361053677","description":"Error received from peer
>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>> closed","grpc_status":14}"
>>
>

Reply via email to