> I applied some modifications to the code to run Beam tasks on k8s cluster using spark-submit.
Interesting, how does that work? On Fri, Dec 13, 2019 at 12:49 PM Matthew K. <softm...@gmx.com> wrote: > > I'm not sure if that could be a problem. I'm *not* running snadalone > Spark. I applied some modifications to the code to run Beam tasks on k8s > cluster using spark-submit. Therefore, worker nodes are spawned when > spark-submit is called and connect to the master, and are supposed to be > destroyed when job is finished. > > Therefore, the crash should have some other reason. > > *Sent:* Friday, December 13, 2019 at 2:37 PM > *From:* "Kyle Weaver" <kcwea...@google.com> > *To:* dev <dev@beam.apache.org> > *Subject:* Re: Beam's job crashes on cluster > Correction: should be formatted `spark://host:port`. Should follow the > rules here: > https://spark.apache.org/docs/latest/submitting-applications.html#master-urls > > On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <kcwea...@google.com> wrote: > >> You probably will want to add argument `-PsparkMasterUrl=localhost:8080` >> (or whatever host:port your Spark master is on) to the job-server:runShadow >> command. >> >> Without specifying the master URL, the default is to start an embedded >> Spark master within the same JVM as the job server, rather than using your >> standalone master. >> >> On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <softm...@gmx.com> wrote: >> >>> Job server is running on master node by this: >>> >>> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd` >>> >>> Spark workers (executors) run on separate nodes, sharing /tmp (1GB size) >>> in order to be able to access Beam job's MANIFEST. I'm running Python 2.7. >>> >>> There is no other shared resources between them. A pure Spark job works >>> fine on the cluster (as far as I tested a simple one). If I'm not wrong, >>> beam job executes with no problem when all master and workers run on the >>> same node (but separate containers). >>> >>> *Sent:* Friday, December 13, 2019 at 1:49 PM >>> *From:* "Kyle Weaver" <kcwea...@google.com> >>> *To:* dev@beam.apache.org >>> *Subject:* Re: Beam's job crashes on cluster >>> > Do workers need to talk to job server independent from spark executors? >>> >>> No, they don't. >>> >>> From the time stamps in your logs, it looks like the sigbus happened >>> after the executor was lost. >>> >>> Some additional info that might help us establish a chain of causation: >>> - the arguments you used to start the job server? >>> - the spark cluster deployment setup? >>> >>> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <softm...@gmx.com> wrote: >>> >>>> Actually the reason for that error is Job Server/JRE crashes at final >>>> stages and service becomes unavailable (note: job is on a very small >>>> dataset that is the absence of cluster, will be done in a couple of >>>> seconds): >>>> >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43 >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295 >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4 >>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>>> sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB, >>>> free: 967.8 MB) >>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>>> 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB) >>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on >>>> 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB) >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222 >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294 >>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37 >>>> <============-> 98% EXECUTING [2m 26s] >>>> > IDLE >>>> > IDLE >>>> > IDLE >>>> > :runners:spark:job-server:runShadow >>>> # >>>> # A fatal error has been detected by the Java Runtime Environment: >>>> # >>>> # SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, >>>> tid=0x00007f5abb886700 >>>> # >>>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build >>>> 1.8.0_232-b09) >>>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64 >>>> compressed oops) >>>> # Problematic frame: >>>> # V [libjvm.so+0x8f8d5e] PerfLongVariant::sample()+0x1e >>>> # >>>> # Core dump written. Default location: /opt/spark/beam/core or core.825 >>>> # >>>> # An error report file with more information is saved as: >>>> # /opt/spark/beam/hs_err_pid825.log >>>> # >>>> # If you would like to submit a bug report, please visit: >>>> # http://bugreport.java.com/bugreport/crash.jsp >>>> # >>>> Aborted (core dumped) >>>> >>>> >>>> From /opt/spark/beam/hs_err_pid825.log: >>>> >>>> Internal exceptions (10 >>>> events): >>>> >>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line >>>> 605] >>>> >>>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a >>>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder> >>>> (0x0000000794f69e70) thrown at >>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp, >>>> line 199] >>>> >>>> >>>> Looking at the logs when running the script, I can see exectors become >>>> lost, but not sure if that might be related to the crash of the job server: >>>> >>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0 >>>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055 >>>> bytes) >>>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in >>>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3 >>>> GB) >>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0 >>>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779 >>>> bytes) >>>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0 >>>> (TID 13) in 37 ms on 192.168.118.75 (executor 1) >>>> (1/4) >>>> >>>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send >>>> map output locations for shuffle 8 to 192.168.118.75:49158 >>>> >>>> 19/12/13 15:07:30 INFO >>>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling >>>> executor >>>> 2. >>>> >>>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4) >>>> >>>> Which result in losing shuffle files, and the following exception: >>>> >>>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2 >>>> (epoch >>>> 4) >>>> >>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0 >>>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670 >>>> bytes) >>>> >>>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0 >>>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1) >>>> (2/4) >>>> >>>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in >>>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3 >>>> GB) >>>> >>>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send >>>> map output locations for shuffle 7 to 192.168.118.75:49158 >>>> >>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0 >>>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670 >>>> bytes) >>>> >>>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID >>>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1, >>>> reduceId=1, >>>> message= >>>> >>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an >>>> output location for shuffle >>>> 7 >>>> >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882) >>>> >>>> at >>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878) >>>> >>>> at >>>> scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>> >>>> at >>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>> >>>> at >>>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878) >>>> >>>> at >>>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691) >>>> >>>> at >>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49) >>>> >>>> at >>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>> >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>> >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>> >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) >>>> >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) >>>> >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165) >>>> >>>> at >>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156) >>>> >>>> at >>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091) >>>> >>>> at >>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156) >>>> >>>> at >>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286) >>>> >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>> >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) >>>> >>>> at >>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288) >>>> >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >>>> >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) >>>> >>>> at >>>> org.apache.spark.scheduler.Task.run(Task.scala:123) >>>> >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) >>>> >>>> at >>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) >>>> >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> >>>> at >>>> java.lang.Thread.run(Thread.java:748) >>>> >>>> >>>> >>>> *Sent:* Friday, December 13, 2019 at 6:58 AM >>>> *From:* "Matthew K." <softm...@gmx.com> >>>> *To:* dev@beam.apache.org >>>> *Cc:* dev <dev@beam.apache.org> >>>> >>>> *Subject:* Re: Beam's job crashes on cluster >>>> Hi Kyle, >>>> >>>> This is the pipeleine options config (I replaced localhost with actual >>>> job server's IP address, and still receive the same error. Do workers need >>>> to talk to job server independent from spark executors?): >>>> >>>> options = PipelineOptions([ >>>> "--runner=PortableRunner", >>>> "--job_endpoint=%s:8099" % ip_address, >>>> "--environment_type=PROCESS", >>>> >>>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}", >>>> "" >>>> ]) >>>> >>>> >>>> >>>> *Sent:* Thursday, December 12, 2019 at 5:30 PM >>>> *From:* "Kyle Weaver" <kcwea...@google.com> >>>> *To:* dev <dev@beam.apache.org> >>>> *Subject:* Re: Beam's job crashes on cluster >>>> Can you share the pipeline options you are using? >>>> Particularly environment_type and environment_config. >>>> >>>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <softm...@gmx.com> wrote: >>>> >>>>> Running Beam on Spark cluster, it crashhes and I get the following >>>>> error (workers are on separate nodes, it works fine when workers are on >>>>> the >>>>> same node as runner): >>>>> >>>>> > Task :runners:spark:job-server:runShadow FAILED >>>>> Exception in thread wait_until_finish_read: >>>>> Traceback (most recent call last): >>>>> File "/usr/lib/python2.7/threading.py", line 801, in >>>>> __bootstrap_inner >>>>> self.run() >>>>> File "/usr/lib/python2.7/threading.py", line 754, in run >>>>> self.__target(*self.__args, **self.__kwargs) >>>>> File >>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>>> line 411, in read_messages >>>>> for message in self._message_stream: >>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>>> 395, in next >>>>> return self._next() >>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>>> 561, in _next >>>>> raise self >>>>> _Rendezvous: <_Rendezvous of RPC that terminated with: >>>>> status = StatusCode.UNAVAILABLE >>>>> details = "Socket closed" >>>>> debug_error_string = >>>>> "{"created":"@1576190515.361076583","description":"Error received from >>>>> peer >>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>>> closed","grpc_status":14}" >>>>> > >>>>> Traceback (most recent call last): >>>>> File "/opt/spark/work-dir/beam_script.py", line 49, in <module> >>>>> stats = >>>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION, >>>>> pipeline_options=options) >>>>> File >>>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py", >>>>> line 197, in generate_statistics_from_csv >>>>> statistics_pb2.DatasetFeatureStatisticsList))) >>>>> File >>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line >>>>> 427, >>>>> in __exit__ >>>>> self.run().wait_until_finish() >>>>> File >>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py", >>>>> line 429, in wait_until_finish >>>>> for state_response in self._state_stream: >>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>>> 395, in next >>>>> return self._next() >>>>> File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line >>>>> 561, in _next >>>>> raise self >>>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: >>>>> status = StatusCode.UNAVAILABLE >>>>> details = "Socket closed" >>>>> debug_error_string = >>>>> "{"created":"@1576190515.361053677","description":"Error received from >>>>> peer >>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket >>>>> closed","grpc_status":14}" >>>>> >>>>