> Do workers need to talk to job server independent from spark executors?
No, they don't. >From the time stamps in your logs, it looks like the sigbus happened after the executor was lost. Some additional info that might help us establish a chain of causation: - the arguments you used to start the job server? - the spark cluster deployment setup? On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote: > Actually the reason for that error is Job Server/JRE crashes at final > stages and service becomes unavailable (note: job is on a very small > dataset that is the absence of cluster, will be done in a couple of > seconds): > > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 > 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on > sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, > free: 967.8 MB) > 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on > 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) > 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on > 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 > 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 > <============-> 98% EXECUTING [2m 26s] > > IDLE > > IDLE > > IDLE > > :runners:spark:job-server:runShadow > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, tid=0x00007f5abb886700 > # > # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build > 1.8.0_232-b09) > # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 > compressed oops) > # Problematic frame: > # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e > # > # Core dump written. Default location: /opt/spark/beam/core or core.825 > # > # An error report file with more information is saved as: > # /opt/spark/beam/hs_err_pid825.log > # > # If you would like to submit a bug report, please visit: > # http://bugreport.java.com/bugreport/crash.jsp > # > Aborted (core dumped) > > > From /opt/spark/beam/hs_err_pid825.log: > > Internal exceptions (10 > events): > > Event: 0.664 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.664 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.665 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.665 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.673 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.674 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > Event: 0.674 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.674 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.674 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line > 605] > > Event: 0.695 Thread 0x00007f5ad000a800 Exception <a > 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder> > (0x0000000794f69e70) thrown at > [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp, > line 199] > > > Looking at the logs when running the script, I can see exectors become > lost, but not sure if that might be related to the crash of the job server: > > 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID > 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055 > bytes) > 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in > memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3 > GB) > 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0 (TID > 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779 > bytes) > 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID > 13) in 37 ms on 192.168.118.75 (executor 1) > (1/4) > > 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send map > output locations for shuffle 8 to 192.168.118.75:49158 > > 19/12/13 15:07:30 INFO > KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling > executor > 2. > > 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4) > > Which result in losing shuffle files, and the following exception: > > 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2 > (epoch > 4) > > 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID > 15, 192.168.118.75, executor 1, partition 1, ANY, 7670 > bytes) > > 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0 (TID > 14) in 3436 ms on 192.168.118.75 (executor 1) > (2/4) > > 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in > memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3 > GB) > > 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send map > output locations for shuffle 7 to 192.168.118.75:49158 > > 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID > 16, 192.168.118.75, executor 1, partition 0, ANY, 7670 > bytes) > > 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID 15, > 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1, > reduceId=1, > message= > > org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output > location for shuffle > 7 > > at > org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882) > > at > org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878) > > at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at > org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878) > > at > org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691) > > at > org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) > > at > org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) > > at > org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) > > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) > > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) > > at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) > > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) > > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) > > at > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:286) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > > at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > > at > org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > > at > org.apache.spark.scheduler.Task.run(Task.scala:123) > > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at > java.lang.Thread.run(Thread.java:748) > > > > *Sent:* Friday, December 13, 2019 at 6:58 AM > *From:* "Matthew K." <[email protected]> > *To:* [email protected] > *Cc:* dev <[email protected]> > > *Subject:* Re: Beam's job crashes on cluster > Hi Kyle, > > This is the pipeleine options config (I replaced localhost with actual job > server's IP address, and still receive the same error. Do workers need to > talk to job server independent from spark executors?): > > options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=%s:8099" % ip_address, > "--environment_type=PROCESS", > > "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", > "" > ]) > > > > *Sent:* Thursday, December 12, 2019 at 5:30 PM > *From:* "Kyle Weaver" <[email protected]> > *To:* dev <[email protected]> > *Subject:* Re: Beam's job crashes on cluster > Can you share the pipeline options you are using? > Particularly environment_type and environment_config. > > On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote: > >> Running Beam on Spark cluster, it crashhes and I get the following error >> (workers are on separate nodes, it works fine when workers are on the same >> node as runner): >> >> > Task :runners:spark:job-server:runShadow FAILED >> Exception in thread wait_until_finish_read: >> Traceback (most recent call last): >> File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner >> self.run() >> File "/usr/lib/python2.7/threading.py", line 754, in run >> self.__target(*self.__args, **self.__kwargs) >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >> line 411, in read_messages >> for message in self._message_stream: >> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >> 395, in next >> return self._next() >> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >> 561, in _next >> raise self >> _Rendezvous: <_Rendezvous of RPC that terminated with: >> status = StatusCode.UNAVAILABLE >> details = "Socket closed" >> debug_error_string = >> "{"created":"@1576190515.361076583","description":"Error received from peer >> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >> closed","grpc_status":14}" >> > >> Traceback (most recent call last): >> File "/opt/spark/work-dir/beam_script.py", line 49, in <module> >> stats = >> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, >> pipeline_options=options) >> File >> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", >> line 197, in generate_statistics_from_csv >> statistics_pb2.DatasetFeatureStatisticsList))) >> File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", >> line 427, in __exit__ >> self.run().wait_until_finish() >> File >> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >> line 429, in wait_until_finish >> for state_response in self._state_stream: >> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >> 395, in next >> return self._next() >> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >> 561, in _next >> raise self >> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: >> status = StatusCode.UNAVAILABLE >> details = "Socket closed" >> debug_error_string = >> "{"created":"@1576190515.361053677","description":"Error received from peer >> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >> closed","grpc_status":14}" >> >
