Correction: should be formatted `spark://host:port`. Should follow the rules here: https://spark.apache.org/docs/latest/submitting-applications.html#master-urls
On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <[email protected]> wrote: > You probably will want to add argument `-PsparkMasterUrl=localhost:8080` > (or whatever host:port your Spark master is on) to the job-server:runShadow > command. > > Without specifying the master URL, the default is to start an embedded > Spark master within the same JVM as the job server, rather than using your > standalone master. > > On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <[email protected]> wrote: > >> Job server is running on master node by this: >> >> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` >> >> Spark workers (executors) run on separate nodes, sharing /tmp (1GB size) >> in order to be able to access Beam job's MANIFEST. I'm running Python 2.7. >> >> There is no other shared resources between them. A pure Spark job works >> fine on the cluster (as far as I tested a simple one). If I'm not wrong, >> beam job executes with no problem when all master and workers run on the >> same node (but separate containers). >> >> *Sent:* Friday, December 13, 2019 at 1:49 PM >> *From:* "Kyle Weaver" <[email protected]> >> *To:* [email protected] >> *Subject:* Re: Beam's job crashes on cluster >> > Do workers need to talk to job server independent from spark executors? >> >> No, they don't. >> >> From the time stamps in your logs, it looks like the sigbus happened >> after the executor was lost. >> >> Some additional info that might help us establish a chain of causation: >> - the arguments you used to start the job server? >> - the spark cluster deployment setup? >> >> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote: >> >>> Actually the reason for that error is Job Server/JRE crashes at final >>> stages and service becomes unavailable (note: job is on a very small >>> dataset that is the absence of cluster, will be done in a couple of >>> seconds): >>> >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 >>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>> sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, >>> free: 967.8 MB) >>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>> 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) >>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>> 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 >>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 >>> <============-> 98% EXECUTING [2m 26s] >>> > IDLE >>> > IDLE >>> > IDLE >>> > :runners:spark:job-server:runShadow >>> # >>> # A fatal error has been detected by the Java Runtime Environment: >>> # >>> # SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, tid=0x00007f5abb886700 >>> # >>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build >>> 1.8.0_232-b09) >>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 >>> compressed oops) >>> # Problematic frame: >>> # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e >>> # >>> # Core dump written. Default location: /opt/spark/beam/core or core.825 >>> # >>> # An error report file with more information is saved as: >>> # /opt/spark/beam/hs_err_pid825.log >>> # >>> # If you would like to submit a bug report, please visit: >>> # http://bugreport.java.com/bugreport/crash.jsp >>> # >>> Aborted (core dumped) >>> >>> >>> From /opt/spark/beam/hs_err_pid825.log: >>> >>> Internal exceptions (10 >>> events): >>> >>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>> 605] >>> >>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a >>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder> >>> (0x0000000794f69e70) thrown at >>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp, >>> line 199] >>> >>> >>> Looking at the logs when running the script, I can see exectors become >>> lost, but not sure if that might be related to the crash of the job server: >>> >>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0 >>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055 >>> bytes) >>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in >>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3 >>> GB) >>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0 >>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779 >>> bytes) >>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0 >>> (TID 13) in 37 ms on 192.168.118.75 (executor 1) >>> (1/4) >>> >>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send map >>> output locations for shuffle 8 to 192.168.118.75:49158 >>> >>> 19/12/13 15:07:30 INFO >>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling >>> executor >>> 2. >>> >>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4) >>> >>> Which result in losing shuffle files, and the following exception: >>> >>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2 >>> (epoch >>> 4) >>> >>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0 >>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670 >>> bytes) >>> >>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0 >>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1) >>> (2/4) >>> >>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in >>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3 >>> GB) >>> >>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send map >>> output locations for shuffle 7 to 192.168.118.75:49158 >>> >>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0 >>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670 >>> bytes) >>> >>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID >>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1, >>> reduceId=1, >>> message= >>> >>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output >>> location for shuffle >>> 7 >>> >>> at >>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882) >>> >>> at >>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878) >>> >>> at >>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> >>> at >>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> >>> at >>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878) >>> >>> at >>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691) >>> >>> at >>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) >>> >>> at >>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) >>> >>> at >>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) >>> >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>> >>> at >>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>> >>> at >>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>> >>> at >>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>> >>> at >>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>> >>> at >>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>> >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>> >>> at >>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) >>> >>> at >>> org.apache.spark.scheduler.Task.run(Task.scala:123) >>> >>> at >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >>> >>> at >>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>> >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> >>> at >>> java.lang.Thread.run(Thread.java:748) >>> >>> >>> >>> *Sent:* Friday, December 13, 2019 at 6:58 AM >>> *From:* "Matthew K." <[email protected]> >>> *To:* [email protected] >>> *Cc:* dev <[email protected]> >>> >>> *Subject:* Re: Beam's job crashes on cluster >>> Hi Kyle, >>> >>> This is the pipeleine options config (I replaced localhost with actual >>> job server's IP address, and still receive the same error. Do workers need >>> to talk to job server independent from spark executors?): >>> >>> options = PipelineOptions([ >>> "--runner=PortableRunner", >>> "--job_endpoint=%s:8099" % ip_address, >>> "--environment_type=PROCESS", >>> >>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", >>> "" >>> ]) >>> >>> >>> >>> *Sent:* Thursday, December 12, 2019 at 5:30 PM >>> *From:* "Kyle Weaver" <[email protected]> >>> *To:* dev <[email protected]> >>> *Subject:* Re: Beam's job crashes on cluster >>> Can you share the pipeline options you are using? >>> Particularly environment_type and environment_config. >>> >>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote: >>> >>>> Running Beam on Spark cluster, it crashhes and I get the following >>>> error (workers are on separate nodes, it works fine when workers are on the >>>> same node as runner): >>>> >>>> > Task :runners:spark:job-server:runShadow FAILED >>>> Exception in thread wait_until_finish_read: >>>> Traceback (most recent call last): >>>> File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner >>>> self.run() >>>> File "/usr/lib/python2.7/threading.py", line 754, in run >>>> self.__target(*self.__args, **self.__kwargs) >>>> File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>> line 411, in read_messages >>>> for message in self._message_stream: >>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>> 395, in next >>>> return self._next() >>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>> 561, in _next >>>> raise self >>>> _Rendezvous: <_Rendezvous of RPC that terminated with: >>>> status = StatusCode.UNAVAILABLE >>>> details = "Socket closed" >>>> debug_error_string = >>>> "{"created":"@1576190515.361076583","description":"Error received from peer >>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>> closed","grpc_status":14}" >>>> > >>>> Traceback (most recent call last): >>>> File "/opt/spark/work-dir/beam_script.py", line 49, in <module> >>>> stats = >>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, >>>> pipeline_options=options) >>>> File >>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", >>>> line 197, in generate_statistics_from_csv >>>> statistics_pb2.DatasetFeatureStatisticsList))) >>>> File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427, >>>> in __exit__ >>>> self.run().wait_until_finish() >>>> File >>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>> line 429, in wait_until_finish >>>> for state_response in self._state_stream: >>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>> 395, in next >>>> return self._next() >>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>> 561, in _next >>>> raise self >>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: >>>> status = StatusCode.UNAVAILABLE >>>> details = "Socket closed" >>>> debug_error_string = >>>> "{"created":"@1576190515.361053677","description":"Error received from peer >>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>> closed","grpc_status":14}" >>>> >>>
