Correction: should be formatted `spark://host:port`. Should follow the
rules here:
https://spark.apache.org/docs/latest/submitting-applications.html#master-urls

On Fri, Dec 13, 2019 at 12:36 PM Kyle Weaver <[email protected]> wrote:

> You probably will want to add argument `-PsparkMasterUrl=localhost:8080`
> (or whatever host:port your Spark master is on) to the job-server:runShadow
> command.
>
> Without specifying the master URL, the default is to start an embedded
> Spark master within the same JVM as the job server, rather than using your
> standalone master.
>
> On Fri, Dec 13, 2019 at 12:15 PM Matthew K. <[email protected]> wrote:
>
>> Job server is running on master node by this:
>>
>> ./gradlew :runners:spark:job-server:runShadow --gradle-user-home `pwd`
>>
>> Spark workers (executors) run on separate nodes, sharing /tmp (1GB size)
>> in order to be able to access Beam job's MANIFEST. I'm running Python 2.7.
>>
>> There is no other shared resources between them. A pure Spark job works
>> fine on the cluster (as far as I tested a simple one). If I'm not wrong,
>> beam job executes with no problem when all master and workers run on the
>> same node (but separate containers).
>>
>> *Sent:* Friday, December 13, 2019 at 1:49 PM
>> *From:* "Kyle Weaver" <[email protected]>
>> *To:* [email protected]
>> *Subject:* Re: Beam's job crashes on cluster
>> > Do workers need to talk to job server independent from spark executors?
>>
>> No, they don't.
>>
>> From the time stamps in your logs, it looks like the sigbus happened
>> after the executor was lost.
>>
>> Some additional info that might help us establish a chain of causation:
>> - the arguments you used to start the job server?
>> - the spark cluster deployment setup?
>>
>> On Fri, Dec 13, 2019 at 8:00 AM Matthew K. <[email protected]> wrote:
>>
>>> Actually the reason for that error is Job Server/JRE crashes at final
>>> stages and service becomes unavailable (note: job is on a very small
>>> dataset that is the absence of cluster, will be done in a couple of
>>> seconds):
>>>
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 43
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 295
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 4
>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>> sparkpi-1576249172021-driver-svc.xyz.svc:7079 in memory (size: 14.4 KB,
>>> free: 967.8 MB)
>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>> 192.168.102.238:46463 in memory (size: 14.4 KB, free: 3.3 GB)
>>> 19/12/13 15:22:11 INFO BlockManagerInfo: Removed broadcast_13_piece0 on
>>> 192.168.78.233:35881 in memory (size: 14.4 KB, free: 3.3 GB)
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 222
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 294
>>> 19/12/13 15:22:11 INFO ContextCleaner: Cleaned accumulator 37
>>> <============-> 98% EXECUTING [2m 26s]
>>> > IDLE
>>> > IDLE
>>> > IDLE
>>> > :runners:spark:job-server:runShadow
>>> #
>>> # A fatal error has been detected by the Java Runtime Environment:
>>> #
>>> #  SIGBUS (0x7) at pc=0x00007f5ad7cd0d5e, pid=825, tid=0x00007f5abb886700
>>> #
>>> # JRE version: OpenJDK Runtime Environment (8.0_232-b09) (build
>>> 1.8.0_232-b09)
>>> # Java VM: OpenJDK 64-Bit Server VM (25.232-b09 mixed mode linux-amd64
>>> compressed oops)
>>> # Problematic frame:
>>> # V  [libjvm.so+0x8f8d5e]  PerfLongVariant::sample()+0x1e
>>> #
>>> # Core dump written. Default location: /opt/spark/beam/core or core.825
>>> #
>>> # An error report file with more information is saved as:
>>> # /opt/spark/beam/hs_err_pid825.log
>>> #
>>> # If you would like to submit a bug report, please visit:
>>> #   http://bugreport.java.com/bugreport/crash.jsp
>>> #
>>> Aborted (core dumped)
>>>
>>>
>>> From /opt/spark/beam/hs_err_pid825.log:
>>>
>>> Internal exceptions (10
>>> events):
>>>
>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d72040) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.664 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d73e60) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d885d0) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.665 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794d8c6d8) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.673 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df7b70) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794df8f38) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfa5b8) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfb6f0) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.674 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/ArrayIndexOutOfBoundsException'> (0x0000000794dfedf0) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/runtime/sharedRuntime.cpp, line
>>> 605]
>>>
>>> Event: 0.695 Thread 0x00007f5ad000a800 Exception <a
>>> 'java/lang/NoClassDefFoundError': org/slf4j/impl/StaticMarkerBinder>
>>> (0x0000000794f69e70) thrown at
>>> [/home/openjdk/jdk8u/hotspot/src/share/vm/classfile/systemDictionary.cpp,
>>> line 199]
>>>
>>>
>>> Looking at the logs when running the script, I can see exectors become
>>> lost, but not sure if that might be related to the crash of the job server:
>>>
>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 0.0 in stage 9.0
>>> (TID 13, 192.168.118.75, executor 1, partition 0, PROCESS_LOCAL, 8055
>>> bytes)
>>> 19/12/13 15:07:29 INFO BlockManagerInfo: Added broadcast_10_piece0 in
>>> memory on 192.168.118.75:37327 (size: 47.3 KB, free: 3.3
>>> GB)
>>> 19/12/13 15:07:29 INFO TaskSetManager: Starting task 3.0 in stage 9.0
>>> (TID 14, 192.168.118.75, executor 1, partition 3, PROCESS_LOCAL, 7779
>>> bytes)
>>> 19/12/13 15:07:29 INFO TaskSetManager: Finished task 0.0 in stage 9.0
>>> (TID 13) in 37 ms on 192.168.118.75 (executor 1)
>>> (1/4)
>>>
>>> 19/12/13 15:07:29 INFO MapOutputTrackerMasterEndpoint: Asked to send map
>>> output locations for shuffle 8 to 192.168.118.75:49158
>>>
>>> 19/12/13 15:07:30 INFO
>>> KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Disabling
>>> executor
>>> 2.
>>>
>>> 19/12/13 15:07:30 INFO DAGScheduler: Executor lost: 2 (epoch 4)
>>>
>>> Which result in losing shuffle files, and the following exception:
>>>
>>> 19/12/13 15:07:30 INFO DAGScheduler: Shuffle files lost for executor: 2
>>> (epoch
>>> 4)
>>>
>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 1.0 in stage 7.0
>>> (TID 15, 192.168.118.75, executor 1, partition 1, ANY, 7670
>>> bytes)
>>>
>>> 19/12/13 15:07:33 INFO TaskSetManager: Finished task 3.0 in stage 9.0
>>> (TID 14) in 3436 ms on 192.168.118.75 (executor 1)
>>> (2/4)
>>>
>>> 19/12/13 15:07:33 INFO BlockManagerInfo: Added broadcast_8_piece0 in
>>> memory on 192.168.118.75:37327 (size: 17.3 KB, free: 3.3
>>> GB)
>>>
>>> 19/12/13 15:07:33 INFO MapOutputTrackerMasterEndpoint: Asked to send map
>>> output locations for shuffle 7 to 192.168.118.75:49158
>>>
>>> 19/12/13 15:07:33 INFO TaskSetManager: Starting task 0.0 in stage 8.0
>>> (TID 16, 192.168.118.75, executor 1, partition 0, ANY, 7670
>>> bytes)
>>>
>>> 19/12/13 15:07:33 WARN TaskSetManager: Lost task 1.0 in stage 7.0 (TID
>>> 15, 192.168.118.75, executor 1): FetchFailed(null, shuffleId=7, mapId=-1,
>>> reduceId=1,
>>> message=
>>>
>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>>> location for shuffle
>>> 7
>>>
>>>         at
>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:882)
>>>
>>>         at
>>> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:878)
>>>
>>>         at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>
>>>         at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>
>>>         at
>>> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:878)
>>>
>>>         at
>>> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:691)
>>>
>>>         at
>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
>>>
>>>         at
>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>>>
>>>         at
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
>>>
>>>         at
>>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>>>
>>>         at
>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>>>
>>>         at
>>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>>>
>>>         at
>>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>>>
>>>         at
>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>>
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>>>
>>>         at
>>> org.apache.spark.scheduler.Task.run(Task.scala:123)
>>>
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>>
>>>         at
>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>>
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>
>>>         at
>>> java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>
>>> *Sent:* Friday, December 13, 2019 at 6:58 AM
>>> *From:* "Matthew K." <[email protected]>
>>> *To:* [email protected]
>>> *Cc:* dev <[email protected]>
>>>
>>> *Subject:* Re: Beam's job crashes on cluster
>>> Hi Kyle,
>>>
>>> This is the pipeleine options config (I replaced localhost with actual
>>> job server's IP address, and still receive the same error. Do workers need
>>> to talk to job server independent from spark executors?):
>>>
>>> options = PipelineOptions([
>>>         "--runner=PortableRunner",
>>>         "--job_endpoint=%s:8099" % ip_address,
>>>         "--environment_type=PROCESS",
>>>
>>> "--environment_config={\"command\":\"/opt/spark/beam/sdks/python/container/build/target/launcher/linux_amd64/boot\"}",
>>>         ""
>>>     ])
>>>
>>>
>>>
>>> *Sent:* Thursday, December 12, 2019 at 5:30 PM
>>> *From:* "Kyle Weaver" <[email protected]>
>>> *To:* dev <[email protected]>
>>> *Subject:* Re: Beam's job crashes on cluster
>>> Can you share the pipeline options you are using?
>>> Particularly environment_type and environment_config.
>>>
>>> On Thu, Dec 12, 2019 at 2:58 PM Matthew K. <[email protected]> wrote:
>>>
>>>> Running Beam on Spark cluster, it crashhes and I get the following
>>>> error (workers are on separate nodes, it works fine when workers are on the
>>>> same node as runner):
>>>>
>>>> > Task :runners:spark:job-server:runShadow FAILED
>>>> Exception in thread wait_until_finish_read:
>>>> Traceback (most recent call last):
>>>>   File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
>>>>     self.run()
>>>>   File "/usr/lib/python2.7/threading.py", line 754, in run
>>>>     self.__target(*self.__args, **self.__kwargs)
>>>>   File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 411, in read_messages
>>>>     for message in self._message_stream:
>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>> 395, in next
>>>>     return self._next()
>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>> 561, in _next
>>>>     raise self
>>>> _Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>         status = StatusCode.UNAVAILABLE
>>>>         details = "Socket closed"
>>>>         debug_error_string =
>>>> "{"created":"@1576190515.361076583","description":"Error received from peer
>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>> closed","grpc_status":14}"
>>>> >
>>>> Traceback (most recent call last):
>>>>   File "/opt/spark/work-dir/beam_script.py", line 49, in <module>
>>>>     stats =
>>>> tfdv.generate_statistics_from_csv(data_location=DATA_LOCATION,
>>>> pipeline_options=options)
>>>>   File
>>>> "/usr/local/lib/python2.7/dist-packages/tensorflow_data_validation/utils/stats_gen_lib.py",
>>>> line 197, in generate_statistics_from_csv
>>>>     statistics_pb2.DatasetFeatureStatisticsList)))
>>>>   File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 427,
>>>> in __exit__
>>>>     self.run().wait_until_finish()
>>>>   File
>>>> "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/portable_runner.py",
>>>> line 429, in wait_until_finish
>>>>     for state_response in self._state_stream:
>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>> 395, in next
>>>>     return self._next()
>>>>   File "/usr/local/lib/python2.7/dist-packages/grpc/_channel.py", line
>>>> 561, in _next
>>>>     raise self
>>>> grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
>>>>         status = StatusCode.UNAVAILABLE
>>>>         details = "Socket closed"
>>>>         debug_error_string =
>>>> "{"created":"@1576190515.361053677","description":"Error received from peer
>>>> ipv4:127.0.0.1:8099","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Socket
>>>> closed","grpc_status":14}"
>>>>
>>>

Reply via email to