> native job submition for Python

We're working on it!

> Any idea where I should look at?

Does the core dump contain the stack trace directly leading to the SIGBUS?

On Fri, Dec 13, 2019 at 1:12 PM Matthew K. <[email protected]> wrote:

> This is not that difficult to implement, but would be better to be done
> when you guys integrated native job submition for Python.
>
> However, I need t fix this last issue, which is the crash. Any idea where
> I should look at?
>
>
> *Sent:* Friday, December 13, 2019 at 2:52 PM
> *From:* "Kyle Weaver" <[email protected]>
> *To:* dev <[email protected]>
> *Subject:* Re: Beam's job crashes on cluster
> > I applied some modifications to the code to run Beam tasks on k8s
> cluster using spark-submit.
>
> Interesting, how does that work?
>
> On Fri, Dec 13, 2019 at 12:49 PM Matthew K. <[email protected]> wrote:
>
>>
>> I'm not sure if that could be a problem. I'm *not* running snadalone
>> Spark. I applied some modifications to the code to run Beam tasks on k8s
>> cluster using spark-submit. Therefore, worker nodes are spawned when
>> spark-submit is called and connect to the master, and are supposed to be
>> destroyed when job is finished.
>>
>> Therefore, the crash should have some other reason.
>>
>> *Sent:* Friday, December 13, 2019 at 2:37 PM
>> *From:* "Kyle Weaver" <[email protected]>
>> *To:* dev <[email protected]>
>> *Subject:* Re: Beam's job crashes on cluster
>> Correction: should be formatted `spark://host:port`. Should follow the
>> rules here:
>> https://spark.apache.org/docs/latest/submitting-applications.html#master-urls
>>
>> On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <[email protected]> wrote:
>>
>>> You probably will want to add argument `-PsparkMasterUrl=localhost:8080`
>>> (or whatever host:port your Spark master is on) to the job-server:runShadow
>>> command.
>>>
>>> Without specifying the master URL, the default is to start an embedded
>>> Spark master within the same JVM as the job server, rather than using your
>>> standalone master.
>>>
>>> On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <[email protected]> wrote:
>>>
>>>> Job server is running on master node by this:
>>>>
>>>> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd`
>>>>
>>>> Spark workers (executors) run on separate nodes, sharing /tmp (1GB
>>>> size) in order to be able to access Beam job's MANIFEST. I'm running Python
>>>> 2.7.
>>>>
>>>> There is no other shared resources between them. A pure Spark job works
>>>> fine on the cluster (as far as I tested a simple one). If I'm not wrong,
>>>> beam job executes with no problem when all master and workers run on the
>>>> same node (but separate containers).
>>>>
>>>> *Sent:* Friday, December 13, 2019 at 1:49 PM
>>>> *From:* "Kyle Weaver" <[email protected]>
>>>> *To:* [email protected]
>>>> *Subject:* Re: Beam's job crashes on cluster
>>>> > Do workers need to talk to job server independent from spark
>>>> executors?
>>>>
>>>> No, they don't.
>>>>
>>>> From the time stamps in your logs, it looks like the sigbus happened
>>>> after the executor was lost.
>>>>
>>>> Some additional info that might help us establish a chain of causation:
>>>> - the arguments you used to start the job server?
>>>> - the spark cluster deployment setup?
>>>>
>>>> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote:
>>>>
>>>>> Actually the reason for that error is Job Server/JRE crashes at final
>>>>> stages and service becomes unavailable (note: job is on a very small
>>>>> dataset that is the absence of cluster, will be done in a couple of
>>>>> seconds):
>>>>>
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4
>>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0
>>>>> on sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB,
>>>>> free: 967.8 MB)
>>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0
>>>>> on 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB)
>>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0
>>>>> on 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB)
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294
>>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37
>>>>> <============-> 98% EXECUTING [2m 26s]
>>>>> > IDLE
>>>>> > IDLE
>>>>> > IDLE
>>>>> > :runners:spark:job-server:runShadow
>>>>> #
>>>>> # A fatal error has been detected by the Java Runtime Environment:
>>>>> #
>>>>> #  SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825,
>>>>> tid=0x00007f5abb886700
>>>>> #
>>>>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build
>>>>> 1.8.0_232-b09)
>>>>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64
>>>>> compressed oops)
>>>>> # Problematic frame:
>>>>> # V  [libjvm.so+0x8f8d5e]  PerfLongVariant::sample()+0x1e
>>>>> #
>>>>> # Core dump written. Default location: /opt/spark/beam/core or core.825
>>>>> #
>>>>> # An error report file with more information is saved as:
>>>>> # /opt/spark/beam/hs_err_pid825.log
>>>>> #
>>>>> # If you would like to submit a bug report, please visit:
>>>>> #   http://bugreport.java.com/bugreport/crash.jsp
>>>>> #
>>>>> Aborted (core dumped)
>>>>>
>>>>>
>>>>> From /opt/spark/beam/hs_err_pid825.log:
>>>>>
>>>>> Internal exceptions (10
>>>>> events):
>>>>>
>>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>>> 605]
>>>>>
>>>>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a
>>>>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder>
>>>>> (0x0000000794f69e70) thrown at
>>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp,
>>>>> line 199]
>>>>>
>>>>>
>>>>> Looking at the logs when running the script, I can see exectors become
>>>>> lost, but not sure if that might be related to the crash of the job 
>>>>> server:
>>>>>
>>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0
>>>>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055
>>>>> bytes)
>>>>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in
>>>>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3
>>>>> GB)
>>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0
>>>>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779
>>>>> bytes)
>>>>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0
>>>>> (TID 13) in 37 ms on 192.168.118.75 (executor 1)
>>>>> (1/4)
>>>>>
>>>>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send
>>>>> map output locations for shuffle 8 to 192.168.118.75:49158
>>>>>
>>>>> 19/12/13 15:07:30 INFO
>>>>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling
>>>>> executor
>>>>> 2.
>>>>>
>>>>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4)
>>>>>
>>>>> Which result in losing shuffle files, and the following exception:
>>>>>
>>>>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor:
>>>>> 2 (epoch
>>>>> 4)
>>>>>
>>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0
>>>>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670
>>>>> bytes)
>>>>>
>>>>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0
>>>>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1)
>>>>> (2/4)
>>>>>
>>>>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in
>>>>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3
>>>>> GB)
>>>>>
>>>>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send
>>>>> map output locations for shuffle 7 to 192.168.118.75:49158
>>>>>
>>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0
>>>>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670
>>>>> bytes)
>>>>>
>>>>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID
>>>>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1,
>>>>> reduceId=1,
>>>>> message=
>>>>>
>>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>>>>> output location for shuffle
>>>>> 7
>>>>>
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
>>>>>
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
>>>>>
>>>>>         at
>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>>
>>>>>         at
>>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>>
>>>>>         at
>>>>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
>>>>>
>>>>>         at
>>>>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
>>>>>
>>>>>         at
>>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>>
>>>>>         at
>>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>>
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>>>>>
>>>>>         at
>>>>> org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>>>
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>>>
>>>>>         at
>>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>>>
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>
>>>>>         at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>
>>>>>         at
>>>>> java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>>
>>>>>
>>>>> *Sent:* Friday, December 13, 2019 at 6:58 AM
>>>>> *From:* "Matthew K." <[email protected]>
>>>>> *To:* [email protected]
>>>>> *Cc:* dev <[email protected]>
>>>>>
>>>>> *Subject:* Re: Beam's job crashes on cluster
>>>>> Hi Kyle,
>>>>>
>>>>> This is the pipeleine options config (I replaced localhost with actual
>>>>> job server's IP address, and still receive the same error. Do workers need
>>>>> to talk to job server independent from spark executors?):
>>>>>
>>>>> options = PipelineOptions([
>>>>>         "--runner=PortableRunner",
>>>>>         "--job_endpoint=%s:8099" % ip_address,
>>>>>         "--environment_type=PROCESS",
>>>>>
>>>>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
>>>>>         ""
>>>>>     ])
>>>>>
>>>>>
>>>>>
>>>>> *Sent:* Thursday, December 12, 2019 at 5:30 PM
>>>>> *From:* "Kyle Weaver" <[email protected]>
>>>>> *To:* dev <[email protected]>
>>>>> *Subject:* Re: Beam's job crashes on cluster
>>>>> Can you share the pipeline options you are using?
>>>>> Particularly environment_type and environment_config.
>>>>>
>>>>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote:
>>>>>
>>>>>> Running Beam on Spark cluster, it crashhes and I get the following
>>>>>> error (workers are on separate nodes, it works fine when workers are on 
>>>>>> the
>>>>>> same node as runner):
>>>>>>
>>>>>> > Task :runners:spark:job-server:runShadow FAILED
>>>>>> Exception in thread wait_until_finish_read:
>>>>>> Traceback (most recent call last):
>>>>>>   File "/usr/lib/python2.7/threading.py", line 801, in
>>>>>> __bootstrap_inner
>>>>>>     self.run()
>>>>>>   File "/usr/lib/python2.7/threading.py", line 754, in run
>>>>>>     self.__target(*self.__args, **self.__kwargs)
>>>>>>   File
>>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 411, in read_messages
>>>>>>     for message in self._message_stream:
>>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py",
>>>>>> line 395, in next
>>>>>>     return self._next()
>>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py",
>>>>>> line 561, in _next
>>>>>>     raise self
>>>>>> _Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>         details = "Socket closed"
>>>>>>         debug_error_string =
>>>>>> "{"created":"@1576190515.361076583","description":"Error received from 
>>>>>> peer
>>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>>>> closed","grpc_status":14}"
>>>>>> >
>>>>>> Traceback (most recent call last):
>>>>>>   File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
>>>>>>     stats =
>>>>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION,
>>>>>> pipeline_options=options)
>>>>>>   File
>>>>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py",
>>>>>> line 197, in generate_statistics_from_csv
>>>>>>     statistics_pb2.DatasetFeatureStatisticsList)))
>>>>>>   File
>>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 
>>>>>> 427,
>>>>>> in __exit__
>>>>>>     self.run().wait_until_finish()
>>>>>>   File
>>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>>>> line 429, in wait_until_finish
>>>>>>     for state_response in self._state_stream:
>>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py",
>>>>>> line 395, in next
>>>>>>     return self._next()
>>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py",
>>>>>> line 561, in _next
>>>>>>     raise self
>>>>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>>>         status = StatusCode.UNAVAILABLE
>>>>>>         details = "Socket closed"
>>>>>>         debug_error_string =
>>>>>> "{"created":"@1576190515.361053677","description":"Error received from 
>>>>>> peer
>>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>>>> closed","grpc_status":14}"
>>>>>>
>>>>>

Reply via email to