> I applied some modifications to the code to run Beam tasks on k8s cluster
using spark-submit.

Interesting, how does that work?

On Fri, Dec 13, 2019 at 12:49 PM Matthew K. <softm...@gmx.com> wrote:

>
> I'm not sure if that could be a problem. I'm *not* running snadalone
> Spark. I applied some modifications to the code to run Beam tasks on k8s
> cluster using spark-submit. Therefore, worker nodes are spawned when
> spark-submit is called and connect to the master, and are supposed to be
> destroyed when job is finished.
>
> Therefore, the crash should have some other reason.
>
> *Sent:* Friday, December 13, 2019 at 2:37 PM
> *From:* "Kyle Weaver" <kcwea...@google.com>
> *To:* dev <dev@beam.apache.org>
> *Subject:* Re: Beam's job crashes on cluster
> Correction: should be formatted `spark://host:port`. Should follow the
> rules here:
> https://spark.apache.org/docs/latest/submitting-applications.html#master-urls
>
> On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <kcwea...@google.com> wrote:
>
>> You probably will want to add argument `-PsparkMasterUrl=localhost:8080`
>> (or whatever host:port your Spark master is on) to the job-server:runShadow
>> command.
>>
>> Without specifying the master URL, the default is to start an embedded
>> Spark master within the same JVM as the job server, rather than using your
>> standalone master.
>>
>> On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <softm...@gmx.com> wrote:
>>
>>> Job server is running on master node by this:
>>>
>>> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd`
>>>
>>> Spark workers (executors) run on separate nodes, sharing /tmp (1GB size)
>>> in order to be able to access Beam job's MANIFEST. I'm running Python 2.7.
>>>
>>> There is no other shared resources between them. A pure Spark job works
>>> fine on the cluster (as far as I tested a simple one). If I'm not wrong,
>>> beam job executes with no problem when all master and workers run on the
>>> same node (but separate containers).
>>>
>>> *Sent:* Friday, December 13, 2019 at 1:49 PM
>>> *From:* "Kyle Weaver" <kcwea...@google.com>
>>> *To:* dev@beam.apache.org
>>> *Subject:* Re: Beam's job crashes on cluster
>>> > Do workers need to talk to job server independent from spark executors?
>>>
>>> No, they don't.
>>>
>>> From the time stamps in your logs, it looks like the sigbus happened
>>> after the executor was lost.
>>>
>>> Some additional info that might help us establish a chain of causation:
>>> - the arguments you used to start the job server?
>>> - the spark cluster deployment setup?
>>>
>>> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <softm...@gmx.com> wrote:
>>>
>>>> Actually the reason for that error is Job Server/JRE crashes at final
>>>> stages and service becomes unavailable (note: job is on a very small
>>>> dataset that is the absence of cluster, will be done in a couple of
>>>> seconds):
>>>>
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4
>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>>> sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB,
>>>> free: 967.8 MB)
>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>>> 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB)
>>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>>> 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB)
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294
>>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37
>>>> <============-> 98% EXECUTING [2m 26s]
>>>> > IDLE
>>>> > IDLE
>>>> > IDLE
>>>> > :runners:spark:job-server:runShadow
>>>> #
>>>> # A fatal error has been detected by the Java Runtime Environment:
>>>> #
>>>> #  SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825,
>>>> tid=0x00007f5abb886700
>>>> #
>>>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build
>>>> 1.8.0_232-b09)
>>>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64
>>>> compressed oops)
>>>> # Problematic frame:
>>>> # V  [libjvm.so+0x8f8d5e]  PerfLongVariant::sample()+0x1e
>>>> #
>>>> # Core dump written. Default location: /opt/spark/beam/core or core.825
>>>> #
>>>> # An error report file with more information is saved as:
>>>> # /opt/spark/beam/hs_err_pid825.log
>>>> #
>>>> # If you would like to submit a bug report, please visit:
>>>> #   http://bugreport.java.com/bugreport/crash.jsp
>>>> #
>>>> Aborted (core dumped)
>>>>
>>>>
>>>> From /opt/spark/beam/hs_err_pid825.log:
>>>>
>>>> Internal exceptions (10
>>>> events):
>>>>
>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>>> 605]
>>>>
>>>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a
>>>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder>
>>>> (0x0000000794f69e70) thrown at
>>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp,
>>>> line 199]
>>>>
>>>>
>>>> Looking at the logs when running the script, I can see exectors become
>>>> lost, but not sure if that might be related to the crash of the job server:
>>>>
>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0
>>>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055
>>>> bytes)
>>>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in
>>>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3
>>>> GB)
>>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0
>>>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779
>>>> bytes)
>>>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0
>>>> (TID 13) in 37 ms on 192.168.118.75 (executor 1)
>>>> (1/4)
>>>>
>>>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send
>>>> map output locations for shuffle 8 to 192.168.118.75:49158
>>>>
>>>> 19/12/13 15:07:30 INFO
>>>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling
>>>> executor
>>>> 2.
>>>>
>>>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4)
>>>>
>>>> Which result in losing shuffle files, and the following exception:
>>>>
>>>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2
>>>> (epoch
>>>> 4)
>>>>
>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0
>>>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670
>>>> bytes)
>>>>
>>>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0
>>>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1)
>>>> (2/4)
>>>>
>>>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in
>>>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3
>>>> GB)
>>>>
>>>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send
>>>> map output locations for shuffle 7 to 192.168.118.75:49158
>>>>
>>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0
>>>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670
>>>> bytes)
>>>>
>>>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID
>>>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1,
>>>> reduceId=1,
>>>> message=
>>>>
>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>>>> output location for shuffle
>>>> 7
>>>>
>>>>         at
>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
>>>>
>>>>         at
>>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
>>>>
>>>>         at
>>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>>
>>>>         at
>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>>
>>>>         at
>>>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
>>>>
>>>>         at
>>>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
>>>>
>>>>         at
>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>>>>
>>>>         at
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>>
>>>>         at
>>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>>
>>>>         at
>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>>
>>>>         at
>>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>>
>>>>         at
>>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>>
>>>>         at
>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>>>>
>>>>         at
>>>> org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>>
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>>
>>>>         at
>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>>
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>
>>>>         at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>
>>>>         at
>>>> java.lang.Thread.run(Thread.java:748)
>>>>
>>>>
>>>>
>>>> *Sent:* Friday, December 13, 2019 at 6:58 AM
>>>> *From:* "Matthew K." <softm...@gmx.com>
>>>> *To:* dev@beam.apache.org
>>>> *Cc:* dev <dev@beam.apache.org>
>>>>
>>>> *Subject:* Re: Beam's job crashes on cluster
>>>> Hi Kyle,
>>>>
>>>> This is the pipeleine options config (I replaced localhost with actual
>>>> job server's IP address, and still receive the same error. Do workers need
>>>> to talk to job server independent from spark executors?):
>>>>
>>>> options = PipelineOptions([
>>>>         "--runner=PortableRunner",
>>>>         "--job_endpoint=%s:8099" % ip_address,
>>>>         "--environment_type=PROCESS",
>>>>
>>>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
>>>>         ""
>>>>     ])
>>>>
>>>>
>>>>
>>>> *Sent:* Thursday, December 12, 2019 at 5:30 PM
>>>> *From:* "Kyle Weaver" <kcwea...@google.com>
>>>> *To:* dev <dev@beam.apache.org>
>>>> *Subject:* Re: Beam's job crashes on cluster
>>>> Can you share the pipeline options you are using?
>>>> Particularly environment_type and environment_config.
>>>>
>>>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <softm...@gmx.com> wrote:
>>>>
>>>>> Running Beam on Spark cluster, it crashhes and I get the following
>>>>> error (workers are on separate nodes, it works fine when workers are on 
>>>>> the
>>>>> same node as runner):
>>>>>
>>>>> > Task :runners:spark:job-server:runShadow FAILED
>>>>> Exception in thread wait_until_finish_read:
>>>>> Traceback (most recent call last):
>>>>>   File "/usr/lib/python2.7/threading.py", line 801, in
>>>>> __bootstrap_inner
>>>>>     self.run()
>>>>>   File "/usr/lib/python2.7/threading.py", line 754, in run
>>>>>     self.__target(*self.__args, **self.__kwargs)
>>>>>   File
>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>>> line 411, in read_messages
>>>>>     for message in self._message_stream:
>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>>> 395, in next
>>>>>     return self._next()
>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>>> 561, in _next
>>>>>     raise self
>>>>> _Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>>         status = StatusCode.UNAVAILABLE
>>>>>         details = "Socket closed"
>>>>>         debug_error_string =
>>>>> "{"created":"@1576190515.361076583","description":"Error received from 
>>>>> peer
>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>>> closed","grpc_status":14}"
>>>>> >
>>>>> Traceback (most recent call last):
>>>>>   File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
>>>>>     stats =
>>>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION,
>>>>> pipeline_options=options)
>>>>>   File
>>>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py",
>>>>> line 197, in generate_statistics_from_csv
>>>>>     statistics_pb2.DatasetFeatureStatisticsList)))
>>>>>   File
>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 
>>>>> 427,
>>>>> in __exit__
>>>>>     self.run().wait_until_finish()
>>>>>   File
>>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>>> line 429, in wait_until_finish
>>>>>     for state_response in self._state_stream:
>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>>> 395, in next
>>>>>     return self._next()
>>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>>> 561, in _next
>>>>>     raise self
>>>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>>         status = StatusCode.UNAVAILABLE
>>>>>         details = "Socket closed"
>>>>>         debug_error_string =
>>>>> "{"created":"@1576190515.361053677","description":"Error received from 
>>>>> peer
>>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>>> closed","grpc_status":14}"
>>>>>
>>>>

Reply via email to