Re: pyspark streaming crashes
just for reference in my case this problem is caused by this bug: https://issues.apache.org/jira/browse/SPARK-12617 On Monday, 21 December 2015, 14:32, Antony Mayi <antonym...@yahoo.com> wrote: I noticed it might be related to longer GC pauses (1-2 sec) - the crash usually occurs after such pause. could that be causing the python-java gateway timing out? On Sunday, 20 December 2015, 23:05, Antony Mayi <antonym...@yahoo.com> wrote: Hi, can anyone please help me troubleshooting this prob: I have a streaming pyspark application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. Doesn't seem to be running out of mem neither on driver or executors. driver error: py4j.protocol.Py4JJavaError: An error occurred while calling o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while obtaining a new communication channel at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) (all) executors error: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError GC (using G1GC) debugging just before crash: driver: [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M Heap: 3288.7M(4096.0M)->675.5M(4096.0M)] executor(s): [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)] thanks.
Re: driver OOM due to io.netty.buffer items not getting finalized
fyi after further troubleshooting logging this as https://issues.apache.org/jira/browse/SPARK-12511 On Tuesday, 22 December 2015, 18:16, Antony Mayi <antonym...@yahoo.com> wrote: I narrowed it down to problem described for example here: https://bugs.openjdk.java.net/browse/JDK-6293787 It is the mass finalization of zip Inflater/Deflater objects which can't keep up with the rate of these instances being garbage collected. as the jdk bug report (not being accepted as a bug) suggests this is an error of suboptimal destruction of the instances. Not sure where the zip comes from - for all the compressors used in spark I was using the default snappy codec. I am trying to disable all the spark.*.compress options and so far it seems this has dramatically improved, the finalization looks to be keeping up and the heap is stable. Any input is still welcome! On Tuesday, 22 December 2015, 12:17, Ted Yu <yuzhih...@gmail.com> wrote: This might be related but the jmap output there looks different: http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi <antonym...@yahoo.com.invalid> wrote: I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm part, not python) OOM (no matter how big heap is assigned, eventually runs out). When checking the heap it is all taken by "byte" items of io.netty.buffer.PoolThreadCache. The number of io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the number of [B "bytes" keeps growing as well as the number of Finalizer instances. When checking the Finalizer instances it is all of ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream num #instances #bytes class name-- 1: 123556 278723776 [B 2: 258988 10359520 java.lang.ref.Finalizer 3: 174620 9778720 java.util.zip.Deflater 4: 66684 7468608 org.apache.spark.executor.TaskMetrics 5: 80070 7160112 [C 6: 282624 6782976 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 7: 206371 4952904 java.lang.Long the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with same issue). would anyone have a clue how to troubleshoot further? thx.
driver OOM due to io.netty.buffer items not getting finalized
I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm part, not python) OOM (no matter how big heap is assigned, eventually runs out). When checking the heap it is all taken by "byte" items of io.netty.buffer.PoolThreadCache. The number of io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the number of [B "bytes" keeps growing as well as the number of Finalizer instances. When checking the Finalizer instances it is all of ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream num #instances #bytes class name-- 1: 123556 278723776 [B 2: 258988 10359520 java.lang.ref.Finalizer 3: 174620 9778720 java.util.zip.Deflater 4: 66684 7468608 org.apache.spark.executor.TaskMetrics 5: 80070 7160112 [C 6: 282624 6782976 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 7: 206371 4952904 java.lang.Long the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with same issue). would anyone have a clue how to troubleshoot further? thx.
Re: driver OOM due to io.netty.buffer items not getting finalized
I narrowed it down to problem described for example here: https://bugs.openjdk.java.net/browse/JDK-6293787 It is the mass finalization of zip Inflater/Deflater objects which can't keep up with the rate of these instances being garbage collected. as the jdk bug report (not being accepted as a bug) suggests this is an error of suboptimal destruction of the instances. Not sure where the zip comes from - for all the compressors used in spark I was using the default snappy codec. I am trying to disable all the spark.*.compress options and so far it seems this has dramatically improved, the finalization looks to be keeping up and the heap is stable. Any input is still welcome! On Tuesday, 22 December 2015, 12:17, Ted Yu <yuzhih...@gmail.com> wrote: This might be related but the jmap output there looks different: http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi <antonym...@yahoo.com.invalid> wrote: I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm part, not python) OOM (no matter how big heap is assigned, eventually runs out). When checking the heap it is all taken by "byte" items of io.netty.buffer.PoolThreadCache. The number of io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the number of [B "bytes" keeps growing as well as the number of Finalizer instances. When checking the Finalizer instances it is all of ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream num #instances #bytes class name-- 1: 123556 278723776 [B 2: 258988 10359520 java.lang.ref.Finalizer 3: 174620 9778720 java.util.zip.Deflater 4: 66684 7468608 org.apache.spark.executor.TaskMetrics 5: 80070 7160112 [C 6: 282624 6782976 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 7: 206371 4952904 java.lang.Long the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with same issue). would anyone have a clue how to troubleshoot further? thx.
Re: pyspark streaming crashes
I noticed it might be related to longer GC pauses (1-2 sec) - the crash usually occurs after such pause. could that be causing the python-java gateway timing out? On Sunday, 20 December 2015, 23:05, Antony Mayi <antonym...@yahoo.com> wrote: Hi, can anyone please help me troubleshooting this prob: I have a streaming pyspark application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. Doesn't seem to be running out of mem neither on driver or executors. driver error: py4j.protocol.Py4JJavaError: An error occurred while calling o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while obtaining a new communication channel at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) (all) executors error: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError GC (using G1GC) debugging just before crash: driver: [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M Heap: 3288.7M(4096.0M)->675.5M(4096.0M)] executor(s): [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)] thanks.
pyspark streaming crashes
Hi, can anyone please help me troubleshooting this prob: I have a streaming pyspark application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. Doesn't seem to be running out of mem neither on driver or executors. driver error: py4j.protocol.Py4JJavaError: An error occurred while calling o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while obtaining a new communication channel at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) (all) executors error: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py", line 136, in main if read_int(infile) == SpecialLengths.END_OF_STREAM: File "/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py", line 545, in read_int raise EOFError GC (using G1GC) debugging just before crash: driver: [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M Heap: 3288.7M(4096.0M)->675.5M(4096.0M)] executor(s): [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)] thanks.
imposed dynamic resource allocation
Hi, using spark 1.5.2 on yarn (client mode) and was trying to use the dynamic resource allocation but it seems once it is enabled by first app then any following application is managed that way even if explicitly disabling. example:1) yarn configured with org.apache.spark.network.yarn.YarnShuffleService as spark_shuffle aux class2) running first app that doesnt specify dynamic allocation / shuffle service - it runs as expected with static executors3) running second application that enables spark.dynamicAllocation.enabled and spark.shuffle.service.enabled - it is dynamic as expected4) running another app that doesnt enable and it even disables dynamic allocation / shuffle service still the executors are being added/removed dynamically throughout the runtime.5) restarting nodemanagers to reset this Is this known issue or have I missed something? Can the dynamic resource allocation be enabled per application? Thanks,Antony.
synchronizing streams of different kafka topics
Hi, I have two streams coming from two different kafka topics. the two topics contain time related events but are quite asymmetric in volume. I would obviously need to process them in sync to get the time related events together but with same processing rate if the heavier stream starts backlogging the events from the tinier stream would be coming ahead of the relevant events that are still in the backlog of the heavy stream. Is there any way to get the smaller stream processed with slower rate so that the relevant events come together with the heavy stream? Thanks,Antony.
IF in SQL statement
Hi, is it expected I can't reference a column inside of IF statement like this: sctx.sql(SELECT name, IF(ts0, price, 0) FROM table).collect() I get an error: org.apache.spark.sql.AnalysisException: unresolved operator 'Project [name#0,if ((CAST(ts#1, DoubleType) CAST(0, DoubleType))) price#2 else 0 AS c1#378]; it works ok if I use value instead of column ref: sctx.sql(SELECT name, IF(ts0, 1, 0) FROM table).collect() thx,Antony.
shuffle data taking immense disk space during ALS
Hi, This has already been briefly discussed here in the past but there seems to be more questions... I am running bigger ALS task with input data ~40GB (~3 billions of ratings). The data is partitioned into 512 partitions and I am also using default parallelism set to 512. The ALS runs with rank=100, iters=15. Using spark 1.2.0. The issue is the volume of temporal data stored on disks generated during the processing. You can see the effect here: http://picpaste.com/disk-UKGFOlte.png It stores 12TB!!! of data until it reaches the 90% threshold when yarn kills it. I have checkpoint directory set so allegedly it should be clearing the temp data but not sure that's happening (although there is 1 drop seen). Is there any solution for this? 12TB of temp not getting cleaned seems to be wrong. Thanks,Antony.
Re: storing MatrixFactorizationModel (pyspark)
well, I understand the math (having two vectors) but the python MatrixFactorizationModel object seems to be just a wrapper around java class so not sure how to extract the two RDDs?thx,Antony. On Thursday, 19 February 2015, 16:32, Ilya Ganelin ilgan...@gmail.com wrote: Yep. the matrix model had two RDD vectors representing the decomposed matrix. You can save these to disk and re use them. On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, when getting the model out of ALS.train it would be beneficial to store it (to disk) so the model can be reused later for any following predictions. I am using pyspark and I had no luck pickling it either using standard pickle module or even dill. does anyone have a solution for this (note it is pyspark)? thank you,Antony.
Re: loads of memory still GC overhead limit exceeded
Hi Ilya, thanks for your insight, this was the right clue. I had default parallelism already set but it was quite low (hundreds) and moreover the number of partitions of the input RDD was low as well so the chunks were really too big. Increased parallelism and repartitioning seems to be helping... Thanks!Antony. On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com wrote: Hi Anthony - you are seeing a problem that I ran into. The underlying issue is your default parallelism setting. What's happening is that within ALS certain RDD operations end up changing the number of partitions you have of your data. For example if you start with an RDD of 300 partitions, unless default parallelism is set while the algorithm executes you'll eventually get an RDD with something like 20 partitions. Consequently, your giant data set is now stored across a much smaller number of partitions so each partition is huge. Then, when a shuffle requires serialization you run out of heap space trying to serialize it. The solution should be as simple as setting the default parallelism setting. This is referenced in a JIRA I can't find at the moment. On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid wrote: now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager ===2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 1432015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379
storing MatrixFactorizationModel (pyspark)
Hi, when getting the model out of ALS.train it would be beneficial to store it (to disk) so the model can be reused later for any following predictions. I am using pyspark and I had no luck pickling it either using standard pickle module or even dill. does anyone have a solution for this (note it is pyspark)? thank you,Antony.
loads of memory still GC overhead limit exceeded
Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks,Antony.
Re: loads of memory still GC overhead limit exceeded
based on spark UI I am running 25 executors for sure. why would you expect four? I submit the task with --num-executors 25 and I get 6-7 executors running per host (using more of smaller executors allows me better cluster utilization when running parallel spark sessions (which is not the case of this reported issue - for now using the cluster exclusively)). thx,Antony. On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote: This should result in 4 executors, not 25. They should be able to execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB of RAM, not 1TB. It still feels like this shouldn't be running out of memory, not by a long shot though. But just pointing out potential differences between what you are expecting and what you are configuring. On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 1.2.0 in yarn-client mode with following layout: spark.executor.cores=4 spark.executor.memory=28G spark.yarn.executor.memoryOverhead=4096 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset with ~3 billion of ratings using 25 executors. At some point some executor crashes with: 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 7259) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) So the GC overhead limit exceeded is pretty clear and would suggest running out of memory. Since I have 1TB of RAM available this must be rather due to some config inoptimality. Can anyone please point me to some directions how to tackle this? Thanks, Antony.
Re: loads of memory still GC overhead limit exceeded
it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), shuffleId=6, mapId=227, reduceId=6, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.1.92:54289 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) === yarn log ===15/02/19 10:15:05 WARN executor.Executor: Told to re-register on heartbeat15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM15/02/19 10:16:02 WARN server.TransportChannelHandler: Exception in connection from /192.168.1.92:45633io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Java heap space at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:280) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) === yarn nodemanager log ===2015-02-19 10:16:45,146 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 20284 for container-id container_1424204221358_0012_01_16: 28.5 GB of 32 GB physical memory used; 29.1 GB of 67.2 GB virtual memory used2015-02-19 10:16:45,163 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 20273 for container-id container_1424204221358_0012_01_20: 28.5 GB of 32 GB physical memory used; 29.2 GB of 67.2 GB virtual memory used2015-02-19 10:16:46,621 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0012_01_08 is : 1432015-02-19 10:16:46,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0012_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 10:16:46,621 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0012_01_08 thanks for any help,Antony. ps. could that be Java 8 related? On Thursday, 19 February 2015, 11:25, Sean Owen so...@cloudera.com wrote: Oh OK you are saying you are requesting 25 executors and getting them, got it. You can consider making fewer, bigger executors to pool rather than split up your memory, but at some point it becomes counter-productive. 32GB is a fine executor size. So you have ~8GB available per task which seems like plenty. Something else is at work here. Is this error form your code's stages or ALS? On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote: based on spark UI I am running 25 executors for sure. why would you expect four? I submit the task with --num-executors 25 and I get 6-7 executors running per host (using
Re: loads of memory still GC overhead limit exceeded
now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager ===2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 1432015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 (TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), shuffleId=6, mapId=227, reduceId=6, message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to /192.168.1.92:54289 at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply
Re: spark-local dir running out of space during long ALS run
thanks, that looks promissing but can't find any reference giving me more details - can you please point me to something? Also is it possible to force GC from pyspark (as I am using pyspark)? thanks,Antony. On Monday, 16 February 2015, 21:05, Tathagata Das tathagata.das1...@gmail.com wrote: Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get rid of files in the workers that are not needed. TD On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way?thanks,Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Re: spark-local dir running out of space during long ALS run
spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way?thanks,Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
spark-local dir running out of space during long ALS run
Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Re: spark-local dir running out of space during long ALS run
spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
pyspark importing custom module
Hi, is there a way to use custom python module that is available to all executors under PYTHONPATH (without a need to upload it using sc.addPyFile()) - bit weird that this module is on all nodes yet the spark tasks can't use it (references to its objects are serialized and sent to all executors but since the module doesn't get imported the calls fail). thanks,Antony.
pyspark and and page allocation failures due to memory fragmentation
Hi, When running big mapreduce operation with pyspark (in the particular case using lot of sets and operations on sets in the map tasks so likely to be allocating and freeing loads of pages) I eventually get kernel error 'python: page allocation failure: order:10, mode:0x2000d0' plus very verbose dump which I can reduce to following snippet: Node 1 Normal: 3601*4kB (UEM) 3159*8kB (UEM) 1669*16kB (UEM) 763*32kB (UEM) 1451*64kB (UEM) 15*128kB (UM) 1*256kB (U) 0*512kB 0*1024kB 0*2048kB 0*4096kB = 185836kB ...SLAB: Unable to allocate memory on node 1 (gfp=0xd0) cache: size-4194304, object size: 4194304, order: 10 so simply the memory got fragmented and there are no higher order pages. interesting thing is that there is no error thrown by spark itself - the processing just gets stuck without any error or anything (only the kernel dmesg explains what happened in the background). any kernel experts out there with an advice how to avoid this? have tried few vm options but still no joy. running spark 1.2.0 (cdh 5.3.0) on kernel 3.8.13 thanks,Antony.
java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used:spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas,Antony.
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) On Wednesday, 28 January 2015, 0:01, Guru Medasani gdm...@outlook.com wrote: Can you attach the logs where this is failing? From: Sven Krasser kras...@gmail.com Date: Tuesday, January 27, 2015 at 4:50 PM To: Guru Medasani gdm...@outlook.com Cc: Sandy Ryza sandy.r...@cloudera.com, Antony Mayi antonym...@yahoo.com, user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Since it's an executor running OOM it doesn't look like a container being killed by YARN to me. As a starting point, can you repartition your job into smaller tasks? -Sven On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani gdm...@outlook.com wrote: Hi Anthony, What is the setting of the total amount of memory in MB that can be allocated to containers on your NodeManagers? yarn.nodemanager.resource.memory-mb Can you check this above configuration in yarn-site.xml used by the node manager process? -Guru Medasani From: Sandy Ryza sandy.r...@cloudera.com Date: Tuesday, January 27, 2015 at 3:33 PM To: Antony Mayi antonym...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: java.lang.OutOfMemoryError: GC overhead limit exceeded Hi Antony, If you look in the YARN NodeManager logs, do you see that it's killing the executors? Or are they crashing for a different reason? -Sandy On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors crashed with this error. does that mean I have genuinely not enough RAM or is this matter of config tuning? other config options used:spark.storage.memoryFraction=0.3 SPARK_EXECUTOR_MEMORY=14G running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS trainImplicit on ~15GB dataset) thanks for any ideas,Antony. -- http://sites.google.com/site/krasser/?utm_source=sig
HW imbalance
Hi, is it possible to mix hosts with (significantly) different specs within a cluster (without wasting the extra resources)? for example having 10 nodes with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there a way to utilize the extra memory by spark executors (as my understanding is all spark executors must have same memory). thanks,Antony.
Re: HW imbalance
should have said I am running as yarn-client. all I can see is specifying the generic executor memory that is then to be used in all containers. On Monday, 26 January 2015, 16:48, Charles Feduke charles.fed...@gmail.com wrote: You should look at using Mesos. This should abstract away the individual hosts into a pool of resources and make the different physical specifications manageable. I haven't tried configuring Spark Standalone mode to have different specs on different machines but based on spark-env.sh.template: # - SPARK_WORKER_CORES, to set the number of cores to use on this machine# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. -Dx=y) it looks like you should be able to mix. (Its not clear to me whether SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where the config file resides.) On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, is it possible to mix hosts with (significantly) different specs within a cluster (without wasting the extra resources)? for example having 10 nodes with 36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there a way to utilize the extra memory by spark executors (as my understanding is all spark executors must have same memory). thanks,Antony.
Re: ALS.trainImplicit running out of mem when using higher rank
the values are for sure applied as expected - confirmed using the spark UI environment page... it comes from my defaults configured using 'spark.yarn.executor.memoryOverhead=8192' (yes, now increased even more) in /etc/spark/conf/spark-defaults.conf and 'export SPARK_EXECUTOR_MEMORY=24G' in /etc/spark/conf/spark-env.sh Antony. On Saturday, 17 January 2015, 11:32, Sean Owen so...@cloudera.com wrote: I'm not sure how you are setting these values though. Where is spark.yarn.executor.memoryOverhead=6144 ? Env variables aren't the best way to set configuration either. Again have a look at http://spark.apache.org/docs/latest/running-on-yarn.html ... --executor-memory 22g --conf spark.yarn.executor.memoryOverhead=2g ... should do it, off the top of my head. That should reserve 24g from YARN. On Sat, Jan 17, 2015 at 5:29 AM, Antony Mayi antonym...@yahoo.com wrote: although this helped to improve it significantly I still run into this problem despite increasing the spark.yarn.executor.memoryOverhead vastly: export SPARK_EXECUTOR_MEMORY=24G spark.yarn.executor.memoryOverhead=6144 yet getting this: 2015-01-17 04:47:40,389 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=30211,containerID=container_1421451766649_0002_01_115969] is running beyond physical memory limits. Current usage: 30.1 GB of 30 GB physical memory used; 33.0 GB of 63.0 GB virtual memory used. Killing container. is there anything more I can do? thanks, Antony. On Monday, 12 January 2015, 8:21, Antony Mayi antonym...@yahoo.com wrote: this seems to have sorted it, awesome, thanks for great help. Antony. On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote: I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
remote Akka client disassociated - some timeout?
Hi, I believe this is some kind of timeout problem but can't figure out how to increase it. I am running spark 1.2.0 on yarn (all from cdh 5.3.0). I submit a python task which first loads big RDD from hbase - I can see in the screen output all executors fire up then no more logging output for next two minutes after which I get plenty of 15/01/16 17:35:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 7 on node01: remote Akka client disassociated15/01/16 17:35:16 INFO scheduler.TaskSetManager: Re-queueing tasks for 7 from TaskSet 1.015/01/16 17:35:16 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 1.0 (TID 17, node01): ExecutorLostFailure (executor 7 lost)15/01/16 17:35:16 WARN scheduler.TaskSetManager: Lost task 34.0 in stage 1.0 (TID 25, node01): ExecutorLostFailure (executor 7 lost) this points to some timeout ~120secs while the nodes are loading the big RDD? any ideas how to get around it? fyi I already use following options without any success: spark.core.connection.ack.wait.timeout: 600 spark.akka.timeout: 1000 thanks,Antony.
Re: ALS.trainImplicit running out of mem when using higher rank
although this helped to improve it significantly I still run into this problem despite increasing the spark.yarn.executor.memoryOverhead vastly: export SPARK_EXECUTOR_MEMORY=24Gspark.yarn.executor.memoryOverhead=6144 yet getting this:2015-01-17 04:47:40,389 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=30211,containerID=container_1421451766649_0002_01_115969] is running beyond physical memory limits. Current usage: 30.1 GB of 30 GB physical memory used; 33.0 GB of 63.0 GB virtual memory used. Killing container. is there anything more I can do? thanks,Antony. On Monday, 12 January 2015, 8:21, Antony Mayi antonym...@yahoo.com wrote: this seems to have sorted it, awesome, thanks for great help.Antony. On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote: I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.io.IOException: sendMessageReliably failed without being ACK'd
Hi, running spark 1.1.0 in yarn-client mode (cdh 5.2.1) on XEN based cloud and randomly getting my executors failing on errors like bellow. I suspect it is some cloud networking issue (XEN driver bug?) but wondering if there is any spark/yarn workaround that I could use to mitigate? Thanks,Antony 15/01/14 10:36:44 ERROR storage.BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(node02,40868) java.io.IOException: sendMessageReliably failed without being ACK'd at org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:865) at org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:861) at org.apache.spark.network.ConnectionManager$MessageStatus.markDone(ConnectionManager.scala:66) at org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:457) at org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:455) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.network.ConnectionManager.removeConnection(ConnectionManager.scala:455) at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434) at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434) at org.apache.spark.network.Connection.callOnCloseCallback(Connection.scala:156) at org.apache.spark.network.Connection.close(Connection.scala:128) at org.apache.spark.network.ConnectionManager.removeConnection(ConnectionManager.scala:476) at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434) at org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434) at org.apache.spark.network.Connection.callOnCloseCallback(Connection.scala:156) at org.apache.spark.network.Connection.close(Connection.scala:128) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:491) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662)
Re: ALS.trainImplicit running out of mem when using higher rank
the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks,Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this:* spark 1.1.0 on yarn (cdh 5.2.1)* ~8-10 executors, 36GB phys RAM per host* input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache())* using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be:SPARK_EXECUTOR_CORES=6SPARK_EXECUTOR_INSTANCES=9SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas,Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected?thanks, Antony.
Re: ALS.trainImplicit running out of mem when using higher rank
this seems to have sorted it, awesome, thanks for great help.Antony. On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote: I would expect the size of the user/item feature RDDs to grow linearly with the rank, of course. They are cached, so that would drive cache memory usage on the cluster. This wouldn't cause executors to fail for running out of memory though. In fact, your error does not show the task failing for lack of memory. What it shows is that YARN thinks the task is using a little bit more memory than it said it would, and killed it. This happens sometimes with JVM-based YARN jobs since a JVM configured to use X heap ends up using a bit more than X physical memory if the heap reaches max size. So there's a bit of headroom built in and controlled by spark.yarn.executor.memoryOverhead (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try increasing it to a couple GB. On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: the question really is whether this is expected that the memory requirements grow rapidly with the rank... as I would expect memory is rather O(1) problem with dependency only on the size of input data. if this is expected is there any rough formula to determine the required memory based on ALS input and parameters? thanks, Antony. On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com wrote: the actual case looks like this: * spark 1.1.0 on yarn (cdh 5.2.1) * ~8-10 executors, 36GB phys RAM per host * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache()) * using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be: SPARK_EXECUTOR_CORES=6 SPARK_EXECUTOR_INSTANCES=9 SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas, Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected? thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ALS.trainImplicit running out of mem when using higher rank
the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected?thanks, Antony.
Re: ALS.trainImplicit running out of mem when using higher rank
the actual case looks like this:* spark 1.1.0 on yarn (cdh 5.2.1)* ~8-10 executors, 36GB phys RAM per host* input RDD is roughly 3GB containing ~150-200M items (and this RDD is made persistent using .cache())* using pyspark yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 (33GB), spark is set to be:SPARK_EXECUTOR_CORES=6SPARK_EXECUTOR_INSTANCES=9SPARK_EXECUTOR_MEMORY=30G when using higher rank (above 20) for ALS.trainImplicit the executor runs after some time (~hour) of execution out of the yarn limit and gets killed: 2015-01-09 17:51:27,130 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=27125,containerID=container_1420871936411_0002_01_23] is running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container. thanks for any ideas,Antony. On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com wrote: the memory requirements seem to be rapidly growing hen using higher rank... I am unable to get over 20 without running out of memory. is this expected?thanks, Antony.
spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
thanks, I found the issue, I was including /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into the classpath - this was breaking it. now using custom jar with just the python convertors and all works as a charm.thanks,antony. On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com wrote: Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like you are inadvertently including Spark code compiled for Hadoop 1 when you run your app. The general idea is to use the cluster's copy at runtime. Those with more pyspark experience might be able to give more useful directions about how to fix that. On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote: this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx,Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD
this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by me and I presume they are pretty good in building it so I still suspect it now gets the classpath resolved in different way? thx,Antony. On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote: Problems like this are always due to having code compiled for Hadoop 1.x run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime Hadoop 2.x is used. A common cause is actually bundling Spark / Hadoop classes with your app, when the app should just use the Spark / Hadoop provided by the cluster. It could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster. On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as yarn-client) - pretty much the standard case demonstrated in the hbase_inputformat.py from examples... the thing is the when trying the very same code on spark 1.2 I am getting the error bellow which based on similar cases on another forums suggest incompatibility between MR1 and MR2. why would this now start happening? is that due to some changes in resolving the classpath which now picks up MR2 jars first while before it was MR1? is there any workaround for this? thanks,Antony. the error: py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at org.apache.spark.rdd.RDD.take(RDD.scala:1060) at org.apache.spark.rdd.RDD.first(RDD.scala:1093) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
pyspark executor PYTHONPATH
Hi, I am running spark 1.1.0 on yarn. I have custom set of modules installed under same location on each executor node and wondering how can I pass the executors the PYTHONPATH so that they can use the modules. I've tried this: spark-env.sh:export PYTHONPATH=/tmp/test/ spark-defaults.conf:spark.executorEnv.PYTHONPATH=/tmp/test/ /tmp/test/pkg:__init__.pymod.py: def test(x): return x from the pyspark shell I can import the module pkg.mod without any issues: $$$ import pkg.mod$$$ print pkg.mod.test(1)1 also the path is correctly set: $$$ print os.environ['PYTHONPATH']/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/:/tmp/test/ $$$ print sys.path['', '/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip', '/usr/lib/spark/python', '/tmp/test/', ... ] it even is seen by the executors: $$$ sc.parallelize(range(4)).map(lambda x: os.environ['PYTHONPATH']).collect()['/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u02/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/'] yet it fails when actually using the module on the executor:$$$ sc.parallelize(range(4)).map(pkg.mod.test).collect()...ImportError: No module named mod... any idea how to achieve this? don't want to use the sc.addPyFile as this is big packages and they are installed everywhere anyway... thank you,Antony.
Re: pyspark executor PYTHONPATH
ok, I see now what's happening - the pkg.mod.test is serialized by reference and there is nothing actually trying to import pkg.mod on the executors so the reference is broken. so how can I get the pkg.mod imported on the executors? thanks,Antony. On Friday, 2 January 2015, 13:49, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running spark 1.1.0 on yarn. I have custom set of modules installed under same location on each executor node and wondering how can I pass the executors the PYTHONPATH so that they can use the modules. I've tried this: spark-env.sh:export PYTHONPATH=/tmp/test/ spark-defaults.conf:spark.executorEnv.PYTHONPATH=/tmp/test/ /tmp/test/pkg:__init__.pymod.py: def test(x): return x from the pyspark shell I can import the module pkg.mod without any issues: $$$ import pkg.mod$$$ print pkg.mod.test(1)1 also the path is correctly set: $$$ print os.environ['PYTHONPATH']/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/:/tmp/test/ $$$ print sys.path['', '/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip', '/usr/lib/spark/python', '/tmp/test/', ... ] it even is seen by the executors: $$$ sc.parallelize(range(4)).map(lambda x: os.environ['PYTHONPATH']).collect()['/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/', '/u02/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/'] yet it fails when actually using the module on the executor:$$$ sc.parallelize(range(4)).map(pkg.mod.test).collect()...ImportError: No module named mod... any idea how to achieve this? don't want to use the sc.addPyFile as this is big packages and they are installed everywhere anyway... thank you,Antony.
saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0
Hi, have been using this without any issues with spark 1.1.0 but after upgrading to 1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just hangs - even when testing with the example from the stock hbase_outputformat.py. anyone having same issue? (and able to solve?) using hbase 0.98.6 and yarn-client mode. thanks,Antony.
Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0
this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK thanks, Antony. On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote: bq. even when testing with the example from the stock hbase_outputformat.py Can you take jstack of the above and pastebin it ? Thanks On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, have been using this without any issues with spark 1.1.0 but after upgrading to 1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just hangs - even when testing with the example from the stock hbase_outputformat.py. anyone having same issue? (and able to solve?) using hbase 0.98.6 and yarn-client mode. thanks,Antony.
Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0
I just run it by hand from pyspark shell. here is the steps: pyspark --jars /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar conf = {hbase.zookeeper.quorum: localhost, ... hbase.mapred.outputtable: test,... mapreduce.outputformat.class: org.apache.hadoop.hbase.mapreduce.TableOutputFormat,... mapreduce.job.output.key.class: org.apache.hadoop.hbase.io.ImmutableBytesWritable,... mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv = org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter valueConv = org.apache.spark.examples.pythonconverters.StringListToPutConverter sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(... conf=conf,... keyConverter=keyConv,... valueConverter=valueConv) then it spills few of the INFO level messages about submitting a task etc but then it just hangs. very same code runs ok on spark 1.1.0 - the records gets stored in hbase. thanks,Antony. On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote: I went over the jstack but didn't find any call related to hbase or zookeeper.Do you find anything important in the logs ? Looks like container launcher was waiting for the script to return some result: - at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715) - at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote: this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK thanks, Antony. On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote: bq. even when testing with the example from the stock hbase_outputformat.py Can you take jstack of the above and pastebin it ? Thanks On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, have been using this without any issues with spark 1.1.0 but after upgrading to 1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just hangs - even when testing with the example from the stock hbase_outputformat.py. anyone having same issue? (and able to solve?) using hbase 0.98.6 and yarn-client mode. thanks,Antony.
Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0
I am running it in yarn-client mode and I believe hbase-client is part of the spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar which I am submitting at launch. adding another jstack taken during the hanging - http://pastebin.com/QDQrBw70 - this is of the CoarseGrainedExecutorBackend process - this one is referencing hbase and zookeeper. thanks,Antony. On Thursday, 25 December 2014, 1:38, Ted Yu yuzhih...@gmail.com wrote: bq. hbase.zookeeper.quorum: localhost You are running hbase cluster in standalone mode ?Is hbase-client jar in the classpath ? Cheers On Wed, Dec 24, 2014 at 4:11 PM, Antony Mayi antonym...@yahoo.com wrote: I just run it by hand from pyspark shell. here is the steps: pyspark --jars /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar conf = {hbase.zookeeper.quorum: localhost, ... hbase.mapred.outputtable: test,... mapreduce.outputformat.class: org.apache.hadoop.hbase.mapreduce.TableOutputFormat,... mapreduce.job.output.key.class: org.apache.hadoop.hbase.io.ImmutableBytesWritable,... mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv = org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter valueConv = org.apache.spark.examples.pythonconverters.StringListToPutConverter sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(... conf=conf,... keyConverter=keyConv,... valueConverter=valueConv) then it spills few of the INFO level messages about submitting a task etc but then it just hangs. very same code runs ok on spark 1.1.0 - the records gets stored in hbase. thanks,Antony. On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote: I went over the jstack but didn't find any call related to hbase or zookeeper.Do you find anything important in the logs ? Looks like container launcher was waiting for the script to return some result: - at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715) - at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote: this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK thanks, Antony. On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote: bq. even when testing with the example from the stock hbase_outputformat.py Can you take jstack of the above and pastebin it ? Thanks On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, have been using this without any issues with spark 1.1.0 but after upgrading to 1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just hangs - even when testing with the example from the stock hbase_outputformat.py. anyone having same issue? (and able to solve?) using hbase 0.98.6 and yarn-client mode. thanks,Antony.
Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0
also hbase itself works ok: hbase(main):006:0 scan 'test'ROW COLUMN+CELL key1 column=f1:asd, timestamp=1419463092904, value=456 1 row(s) in 0.0250 seconds hbase(main):007:0 put 'test', 'testkey', 'f1:testqual', 'testval'0 row(s) in 0.0170 seconds hbase(main):008:0 scan 'test'ROW COLUMN+CELL key1 column=f1:asd, timestamp=1419463092904, value=456 testkey column=f1:testqual, timestamp=1419487275905, value=testval 2 row(s) in 0.0270 seconds On Thursday, 25 December 2014, 6:58, Antony Mayi antonym...@yahoo.com wrote: I am running it in yarn-client mode and I believe hbase-client is part of the spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar which I am submitting at launch. adding another jstack taken during the hanging - http://pastebin.com/QDQrBw70 - this is of the CoarseGrainedExecutorBackend process - this one is referencing hbase and zookeeper. thanks,Antony. On Thursday, 25 December 2014, 1:38, Ted Yu yuzhih...@gmail.com wrote: bq. hbase.zookeeper.quorum: localhost You are running hbase cluster in standalone mode ?Is hbase-client jar in the classpath ? Cheers On Wed, Dec 24, 2014 at 4:11 PM, Antony Mayi antonym...@yahoo.com wrote: I just run it by hand from pyspark shell. here is the steps: pyspark --jars /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar conf = {hbase.zookeeper.quorum: localhost, ... hbase.mapred.outputtable: test,... mapreduce.outputformat.class: org.apache.hadoop.hbase.mapreduce.TableOutputFormat,... mapreduce.job.output.key.class: org.apache.hadoop.hbase.io.ImmutableBytesWritable,... mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv = org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter valueConv = org.apache.spark.examples.pythonconverters.StringListToPutConverter sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(... conf=conf,... keyConverter=keyConv,... valueConverter=valueConv) then it spills few of the INFO level messages about submitting a task etc but then it just hangs. very same code runs ok on spark 1.1.0 - the records gets stored in hbase. thanks,Antony. On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote: I went over the jstack but didn't find any call related to hbase or zookeeper.Do you find anything important in the logs ? Looks like container launcher was waiting for the script to return some result: - at org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715) - at org.apache.hadoop.util.Shell.runCommand(Shell.java:524) On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote: this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK thanks, Antony. On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote: bq. even when testing with the example from the stock hbase_outputformat.py Can you take jstack of the above and pastebin it ? Thanks On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, have been using this without any issues with spark 1.1.0 but after upgrading to 1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just hangs - even when testing with the example from the stock hbase_outputformat.py. anyone having same issue? (and able to solve?) using hbase 0.98.6 and yarn-client mode. thanks,Antony.
Re: custom python converter from HBase Result to tuple
using hbase 0.98.6 there is no stack trace, just this short error. just noticed it does the fallback to toString as in the message as this is what I get back to python: hbase_rdd.collect() [(u'key1', u'List(cf1:12345:14567890, cf2:123:14567896)')] so the question is why it falls back to toString? thanks,Antony. On Monday, 22 December 2014, 20:09, Ted Yu yuzhih...@gmail.com wrote: Which HBase version are you using ? Can you show the full stack trace ? Cheers On Mon, Dec 22, 2014 at 11:02 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, can anyone please give me some help how to write custom converter of hbase data to (for example) tuples of ((family, qualifier, value), ) for pyspark: I was trying something like (here trying to tuples of (family:qualifier:value, )): class HBaseResultToTupleConverter extends Converter[Any, List[String]] { override def convert(obj: Any): List[String] = { val result = obj.asInstanceOf[Result] result.rawCells().map(cell = List(Bytes.toString(CellUtil.cloneFamily(cell)), Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell))).mkString(:) ).toList } } but then I get a error: 14/12/22 16:27:40 WARN python.SerDeUtil: Failed to pickle Java object as value: $colon$colon, falling back to 'toString'. Error: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments does anyone have a hint? Thanks, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
cartesian on pyspark not paralleised
Hi, using pyspark 1.1.0 on YARN 2.5.0. all operations run nicely in parallel - I can seen multiple python processes spawned on each nodemanager but from some reason when running cartesian there is only single python process running on each node. the task is indicating thousands of partitions so don't understand why it is not running with higher parallelism. the performance is obviously poor although other operation rocks. any idea how to improve this? thank you, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org