Re: pyspark streaming crashes

2016-01-04 Thread Antony Mayi
just for reference in my case this problem is caused by this bug: 
https://issues.apache.org/jira/browse/SPARK-12617 

On Monday, 21 December 2015, 14:32, Antony Mayi <antonym...@yahoo.com> 
wrote:
 
 

 I noticed it might be related to longer GC pauses (1-2 sec) - the crash 
usually occurs after such pause. could that be causing the python-java gateway 
timing out? 

On Sunday, 20 December 2015, 23:05, Antony Mayi <antonym...@yahoo.com> 
wrote:
 
 

 Hi,
can anyone please help me troubleshooting this prob: I have a streaming pyspark 
application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. 
Doesn't seem to be running out of mem neither on driver or executors.
driver error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while 
obtaining a new communication channel        at 
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)

(all) executors error:
  File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py",
 line 136, in main    if read_int(infile) == SpecialLengths.END_OF_STREAM:  
File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py",
 line 545, in read_int    raise EOFError


GC (using G1GC) debugging just before crash:
driver:   [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M 
Heap: 3288.7M(4096.0M)->675.5M(4096.0M)]
executor(s):   [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M 
Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)]

thanks.

 
   

 
  

Re: driver OOM due to io.netty.buffer items not getting finalized

2015-12-23 Thread Antony Mayi
fyi after further troubleshooting logging this as 
https://issues.apache.org/jira/browse/SPARK-12511 

On Tuesday, 22 December 2015, 18:16, Antony Mayi <antonym...@yahoo.com> 
wrote:
 
 

 I narrowed it down to problem described for example here: 
https://bugs.openjdk.java.net/browse/JDK-6293787
It is the mass finalization of zip Inflater/Deflater objects which can't keep 
up with the rate of these instances being garbage collected. as the jdk bug 
report (not being accepted as a bug) suggests this is an error of suboptimal 
destruction of the instances.
Not sure where the zip comes from - for all the compressors used in spark I was 
using the default snappy codec.
I am trying to disable all the spark.*.compress options and so far it seems 
this has dramatically improved, the finalization looks to be keeping up and the 
heap is stable.
Any input is still welcome! 

On Tuesday, 22 December 2015, 12:17, Ted Yu <yuzhih...@gmail.com> wrote:
 
 

 This might be related but the jmap output there looks different:
http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances

On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi <antonym...@yahoo.com.invalid> 
wrote:

I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm 
part, not python) OOM (no matter how big heap is assigned, eventually runs out).
When checking the heap it is all taken by "byte" items of 
io.netty.buffer.PoolThreadCache. The number of 
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the 
number of [B "bytes" keeps growing as well as the number of Finalizer 
instances. When checking the Finalizer instances it is all of 
ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream
 num     #instances         #bytes  class 
name--   1:        123556      
278723776  [B   2:        258988       10359520  java.lang.ref.Finalizer   3:   
     174620        9778720  java.util.zip.Deflater   4:         66684        
7468608  org.apache.spark.executor.TaskMetrics   5:         80070        
7160112  [C   6:        282624        6782976  
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry   7:        206371      
  4952904  java.lang.Long
the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with 
same issue).
would anyone have a clue how to troubleshoot further?
thx.



 
   

 
  

driver OOM due to io.netty.buffer items not getting finalized

2015-12-22 Thread Antony Mayi
I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm 
part, not python) OOM (no matter how big heap is assigned, eventually runs out).
When checking the heap it is all taken by "byte" items of 
io.netty.buffer.PoolThreadCache. The number of 
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the 
number of [B "bytes" keeps growing as well as the number of Finalizer 
instances. When checking the Finalizer instances it is all of 
ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream
 num     #instances         #bytes  class 
name--   1:        123556      
278723776  [B   2:        258988       10359520  java.lang.ref.Finalizer   3:   
     174620        9778720  java.util.zip.Deflater   4:         66684        
7468608  org.apache.spark.executor.TaskMetrics   5:         80070        
7160112  [C   6:        282624        6782976  
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry   7:        206371      
  4952904  java.lang.Long
the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with 
same issue).
would anyone have a clue how to troubleshoot further?
thx.

Re: driver OOM due to io.netty.buffer items not getting finalized

2015-12-22 Thread Antony Mayi
I narrowed it down to problem described for example here: 
https://bugs.openjdk.java.net/browse/JDK-6293787
It is the mass finalization of zip Inflater/Deflater objects which can't keep 
up with the rate of these instances being garbage collected. as the jdk bug 
report (not being accepted as a bug) suggests this is an error of suboptimal 
destruction of the instances.
Not sure where the zip comes from - for all the compressors used in spark I was 
using the default snappy codec.
I am trying to disable all the spark.*.compress options and so far it seems 
this has dramatically improved, the finalization looks to be keeping up and the 
heap is stable.
Any input is still welcome! 

On Tuesday, 22 December 2015, 12:17, Ted Yu <yuzhih...@gmail.com> wrote:
 
 

 This might be related but the jmap output there looks different:
http://stackoverflow.com/questions/32537965/huge-number-of-io-netty-buffer-poolthreadcachememoryregioncacheentry-instances

On Tue, Dec 22, 2015 at 2:59 AM, Antony Mayi <antonym...@yahoo.com.invalid> 
wrote:

I have streaming app (pyspark 1.5.2 on yarn) that's crashing due to driver (jvm 
part, not python) OOM (no matter how big heap is assigned, eventually runs out).
When checking the heap it is all taken by "byte" items of 
io.netty.buffer.PoolThreadCache. The number of 
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry is constant yet the 
number of [B "bytes" keeps growing as well as the number of Finalizer 
instances. When checking the Finalizer instances it is all of 
ZipFile$ZipFileInputStream and ZipFile$ZipFileInflaterInputStream
 num     #instances         #bytes  class 
name--   1:        123556      
278723776  [B   2:        258988       10359520  java.lang.ref.Finalizer   3:   
     174620        9778720  java.util.zip.Deflater   4:         66684        
7468608  org.apache.spark.executor.TaskMetrics   5:         80070        
7160112  [C   6:        282624        6782976  
io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry   7:        206371      
  4952904  java.lang.Long
the platform is using netty 3.6.6 and openjdk 1.8 (tried on 1.7 as well with 
same issue).
would anyone have a clue how to troubleshoot further?
thx.



 
  

Re: pyspark streaming crashes

2015-12-21 Thread Antony Mayi
I noticed it might be related to longer GC pauses (1-2 sec) - the crash usually 
occurs after such pause. could that be causing the python-java gateway timing 
out? 

On Sunday, 20 December 2015, 23:05, Antony Mayi <antonym...@yahoo.com> 
wrote:
 
 

 Hi,
can anyone please help me troubleshooting this prob: I have a streaming pyspark 
application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. 
Doesn't seem to be running out of mem neither on driver or executors.
driver error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while 
obtaining a new communication channel        at 
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)

(all) executors error:
  File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py",
 line 136, in main    if read_int(infile) == SpecialLengths.END_OF_STREAM:  
File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py",
 line 545, in read_int    raise EOFError


GC (using G1GC) debugging just before crash:
driver:   [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M 
Heap: 3288.7M(4096.0M)->675.5M(4096.0M)]
executor(s):   [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M 
Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)]

thanks.

 
  

pyspark streaming crashes

2015-12-20 Thread Antony Mayi
Hi,
can anyone please help me troubleshooting this prob: I have a streaming pyspark 
application (spark 1.5.2 on yarn-client) which keeps crashing after few hours. 
Doesn't seem to be running out of mem neither on driver or executors.
driver error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
o1.awaitTermination.: java.io.IOException: py4j.Py4JException: Error while 
obtaining a new communication channel        at 
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)        at 
org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77)
        at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)

(all) executors error:
  File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/worker.py",
 line 136, in main    if read_int(infile) == SpecialLengths.END_OF_STREAM:  
File 
"/u04/yarn/local/usercache/das/appcache/application_1450337892069_0336/container_1450337892069_0336_01_08/pyspark.zip/pyspark/serializers.py",
 line 545, in read_int    raise EOFError


GC (using G1GC) debugging just before crash:
driver:   [Eden: 2316.0M(2316.0M)->0.0B(2318.0M) Survivors: 140.0M->138.0M 
Heap: 3288.7M(4096.0M)->675.5M(4096.0M)]
executor(s):   [Eden: 2342.0M(2342.0M)->0.0B(2378.0M) Survivors: 52.0M->34.0M 
Heap: 3601.7M(4096.0M)->1242.7M(4096.0M)]

thanks.

imposed dynamic resource allocation

2015-12-11 Thread Antony Mayi
Hi,
using spark 1.5.2 on yarn (client mode) and was trying to use the dynamic 
resource allocation but it seems once it is enabled by first app then any 
following application is managed that way even if explicitly disabling.
example:1) yarn configured with 
org.apache.spark.network.yarn.YarnShuffleService as spark_shuffle aux class2) 
running first app that doesnt specify dynamic allocation / shuffle service - it 
runs as expected with static executors3) running second application that 
enables spark.dynamicAllocation.enabled and spark.shuffle.service.enabled - it 
is dynamic as expected4) running another app that doesnt enable and it even 
disables dynamic allocation / shuffle service still the executors are being 
added/removed dynamically throughout the runtime.5) restarting nodemanagers to 
reset this
Is this known issue or have I missed something? Can the dynamic resource 
allocation be enabled per application?
Thanks,Antony.

synchronizing streams of different kafka topics

2015-11-17 Thread Antony Mayi
Hi,
I have two streams coming from two different kafka topics. the two topics 
contain time related events but are quite asymmetric in volume. I would 
obviously need to process them in sync to get the time related events together 
but with same processing rate if the heavier stream starts backlogging the 
events from the tinier stream would be coming ahead of the relevant events that 
are still in the backlog of the heavy stream.
Is there any way to get the smaller stream processed with slower rate so that 
the relevant events come together with the heavy stream?
Thanks,Antony.

IF in SQL statement

2015-05-16 Thread Antony Mayi
Hi,
is it expected I can't reference a column inside of IF statement like this:
sctx.sql(SELECT name, IF(ts0, price, 0) FROM table).collect()

I get an error:
org.apache.spark.sql.AnalysisException: unresolved operator 'Project [name#0,if 
((CAST(ts#1, DoubleType)  CAST(0, DoubleType))) price#2 else 0 AS c1#378];
it works ok if I use value instead of column ref:
sctx.sql(SELECT name, IF(ts0, 1, 0) FROM table).collect()

thx,Antony.


shuffle data taking immense disk space during ALS

2015-02-23 Thread Antony Mayi
Hi,
This has already been briefly discussed here in the past but there seems to be 
more questions...
I am running bigger ALS task with input data ~40GB (~3 billions of ratings). 
The data is partitioned into 512 partitions and I am also using default 
parallelism set to 512. The ALS runs with rank=100, iters=15. Using spark 1.2.0.
The issue is the volume of temporal data stored on disks generated during the 
processing. You can see the effect here: http://picpaste.com/disk-UKGFOlte.png 
It stores 12TB!!! of data until it reaches the 90% threshold when yarn kills it.
I have checkpoint directory set so allegedly it should be clearing the temp 
data but not sure that's happening (although there is 1 drop seen).
Is there any solution for this? 12TB of temp not getting cleaned seems to be 
wrong.
Thanks,Antony. 

Re: storing MatrixFactorizationModel (pyspark)

2015-02-20 Thread Antony Mayi
well, I understand the math (having two vectors) but the python 
MatrixFactorizationModel object seems to be just a wrapper around java class so 
not sure how to extract the two RDDs?thx,Antony. 

 On Thursday, 19 February 2015, 16:32, Ilya Ganelin ilgan...@gmail.com 
wrote:
   
 

 Yep. the matrix model had two RDD vectors representing the decomposed matrix. 
You can save these to disk and re use them. 
On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
when getting the model out of ALS.train it would be beneficial to store it (to 
disk) so the model can be reused later for any following predictions. I am 
using pyspark and I had no luck pickling it either using standard pickle module 
or even dill.
does anyone have a solution for this (note it is pyspark)?
thank you,Antony.


 
   

Re: loads of memory still GC overhead limit exceeded

2015-02-20 Thread Antony Mayi
Hi Ilya,
thanks for your insight, this was the right clue. I had default parallelism 
already set but it was quite low (hundreds) and moreover the number of 
partitions of the input RDD was low as well so the chunks were really too big. 
Increased parallelism and repartitioning seems to be helping...
Thanks!Antony. 

 On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com 
wrote:
   
 

 Hi Anthony - you are seeing a problem that I ran into. The underlying issue is 
your default parallelism setting. What's happening is that within ALS certain 
RDD operations end up changing the number of partitions you have of your data. 
For example if you start with an RDD of 300 partitions, unless default 
parallelism is set while the algorithm executes you'll eventually get an RDD 
with something like 20 partitions. Consequently, your giant data set is now 
stored across a much smaller number of partitions so each partition is huge. 
Then, when a shuffle requires serialization you run out of heap space trying to 
serialize it. The solution should be as simple as setting the default 
parallelism setting. 

This is referenced in a JIRA I can't find at the moment. 
On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid 
wrote:

now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC 
overhead limit exceeded:
=== spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 
7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in 
task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit 
exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn nodemanager ===2015-02-19 12:08:13,758 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id 
container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 
31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id 
container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 
103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0013_01_08 is : 1432015-02-19 
12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0013_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08
Antony.

 

 On Thursday, 19 February 2015, 11:54, Antony Mayi 
antonym...@yahoo.com.INVALID wrote:
   
 

 it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379

storing MatrixFactorizationModel (pyspark)

2015-02-19 Thread Antony Mayi
Hi,
when getting the model out of ALS.train it would be beneficial to store it (to 
disk) so the model can be reused later for any following predictions. I am 
using pyspark and I had no luck pickling it either using standard pickle module 
or even dill.
does anyone have a solution for this (note it is pyspark)?
thank you,Antony.

loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
Hi,
I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark 
1.2.0 in yarn-client mode with following layout:
spark.executor.cores=4
spark.executor.memory=28G
spark.yarn.executor.memoryOverhead=4096

I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a dataset 
with ~3 billion of ratings using 25 executors. At some point some executor 
crashes with:
15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]     
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)     
   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)    
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)        
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)        at 
org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)        at 
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)15/02/19 
05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage 51.0 (TID 
7259)java.lang.OutOfMemoryError: GC overhead limit exceeded        at 
java.lang.reflect.Array.newInstance(Array.java:75)        at 
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)        at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)        at 
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)        at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
So the GC overhead limit exceeded is pretty clear and would suggest running out 
of memory. Since I have 1TB of RAM available this must be rather due to some 
config inoptimality.
Can anyone please point me to some directions how to tackle this?
Thanks,Antony.

Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
based on spark UI I am running 25 executors for sure. why would you expect 
four? I submit the task with --num-executors 25 and I get 6-7 executors running 
per host (using more of smaller executors allows me better cluster utilization 
when running parallel spark sessions (which is not the case of this reported 
issue - for now using the cluster exclusively)).
thx,Antony. 

 On Thursday, 19 February 2015, 11:02, Sean Owen so...@cloudera.com wrote:
   
 

 This should result in 4 executors, not 25. They should be able to
execute 4*4 = 16 tasks simultaneously. You have them grab 4*32 = 128GB
of RAM, not 1TB.

It still feels like this shouldn't be running out of memory, not by a
long shot though. But just pointing out potential differences between
what you are expecting and what you are configuring.

On Thu, Feb 19, 2015 at 9:56 AM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 Hi,

 I have 4 very powerful boxes (256GB RAM, 32 cores each). I am running spark
 1.2.0 in yarn-client mode with following layout:

 spark.executor.cores=4
 spark.executor.memory=28G
 spark.yarn.executor.memoryOverhead=4096

 I am submitting bigger ALS trainImplicit task (rank=100, iters=15) on a
 dataset with ~3 billion of ratings using 25 executors. At some point some
 executor crashes with:

 15/02/19 05:41:06 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
        at
 org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398)
 15/02/19 05:41:06 ERROR executor.Executor: Exception in task 131.0 in stage
 51.0 (TID 7259)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.reflect.Array.newInstance(Array.java:75)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)
        at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
        at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)

 So the GC overhead limit exceeded is pretty clear and would suggest running
 out of memory. Since I have 1TB of RAM available this must be rather due to
 some config inoptimality.

 Can anyone please point me to some directions how to tackle this?

 Thanks,
 Antony.


 
   

Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): 
ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR 
cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 
615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 
(TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), 
shuffleId=6, mapId=227, reduceId=6, 
message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
/192.168.1.92:54289        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
=== yarn log ===15/02/19 10:15:05 WARN executor.Executor: Told to re-register 
on heartbeat15/02/19 10:16:02 ERROR executor.CoarseGrainedExecutorBackend: 
RECEIVED SIGNAL 15: SIGTERM15/02/19 10:16:02 WARN 
server.TransportChannelHandler: Exception in connection from 
/192.168.1.92:45633io.netty.handler.codec.DecoderException: 
java.lang.OutOfMemoryError: Java heap space        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:280)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
=== yarn nodemanager log ===2015-02-19 10:16:45,146 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20284 for container-id 
container_1424204221358_0012_01_16: 28.5 GB of 32 GB physical memory used; 
29.1 GB of 67.2 GB virtual memory used2015-02-19 10:16:45,163 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 20273 for container-id 
container_1424204221358_0012_01_20: 28.5 GB of 32 GB physical memory used; 
29.2 GB of 67.2 GB virtual memory used2015-02-19 10:16:46,621 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0012_01_08 is : 1432015-02-19 
10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0012_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 10:16:46,621 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0012_01_08

thanks for any help,Antony.








ps. could that be Java 8 related? 

 On Thursday, 19 February 2015, 11:25, Sean Owen so...@cloudera.com wrote:
   
 

 Oh OK you are saying you are requesting 25 executors and getting them,
got it. You can consider making fewer, bigger executors to pool rather
than split up your memory, but at some point it becomes
counter-productive. 32GB is a fine executor size.

So you have ~8GB available per task which seems like plenty. Something
else is at work here. Is this error form your code's stages or ALS?

On Thu, Feb 19, 2015 at 10:07 AM, Antony Mayi antonym...@yahoo.com wrote:
 based on spark UI I am running 25 executors for sure. why would you expect
 four? I submit the task with --num-executors 25 and I get 6-7 executors
 running per host (using

Re: loads of memory still GC overhead limit exceeded

2015-02-19 Thread Antony Mayi
now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC 
overhead limit exceeded:
=== spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 
7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC 
overhead limit exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in 
task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit 
exceeded        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)        
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)        
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)    
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)       
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
=== yarn nodemanager ===2015-02-19 12:08:13,758 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19014 for container-id 
container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 
31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 19013 for container-id 
container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 
103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1424204221358_0013_01_08 is : 1432015-02-19 
12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1424204221358_0013_01_08 transitioned from RUNNING to 
EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_1424204221358_0013_01_08
Antony.

 

 On Thursday, 19 February 2015, 11:54, Antony Mayi 
antonym...@yahoo.com.INVALID wrote:
   
 

 it is from within the ALS.trainImplicit() call. btw. the exception varies 
between this GC overhead limit exceeded and Java heap space (which I guess 
is just different outcome of same problem).
just tried another run and here are the logs (filtered) - note I tried this run 
with spark.shuffle.io.preferDirectBufs=false so this might be slightly 
different issue from my previous case (going to revert now):
=== spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: 
Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent 
heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN 
storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 
192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 
45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost 
executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 
WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 32.0 in stage 18.0 (TID 5354, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 82.0 in stage 18.0 (TID 5404, 
192.168.1.92): ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 WARN 
scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.92): 
ExecutorLostFailure (executor 6 lost)15/02/19 10:16:44 ERROR 
cluster.YarnClientSchedulerBackend: Asked to remove non-existent executor 
615/02/19 10:16:54 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 18.0 
(TID 5328, 192.168.1.90): FetchFailed(BlockManagerId(6, 192.168.1.92, 54289), 
shuffleId=6, mapId=227, reduceId=6, 
message=org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
/192.168.1.92:54289        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
thanks, that looks promissing but can't find any reference giving me more 
details - can you please point me to something? Also is it possible to force GC 
from pyspark (as I am using pyspark)?
thanks,Antony. 

 On Monday, 16 February 2015, 21:05, Tathagata Das 
tathagata.das1...@gmail.com wrote:
   
 

 Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do 
automatic cleanup of files based on which RDDs are used/garbage collected by 
JVM. That would be the best way, but depends on the JVM GC characteristics. If 
you force a GC periodically in the driver that might help you get rid of files 
in the workers that are not needed.
TD
On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 




 
   

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 
   

spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

Re: spark-local dir running out of space during long ALS run

2015-02-15 Thread Antony Mayi
spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 
   

pyspark importing custom module

2015-02-06 Thread Antony Mayi
Hi,
is there a way to use custom python module that is available to all executors 
under PYTHONPATH (without a need to upload it using sc.addPyFile()) - bit weird 
that this module is on all nodes yet the spark tasks can't use it (references 
to its objects are serialized and sent to all executors but since the module 
doesn't get imported the calls fail).
thanks,Antony.

pyspark and and page allocation failures due to memory fragmentation

2015-01-30 Thread Antony Mayi
Hi,
When running big mapreduce operation with pyspark (in the particular case using 
lot of sets and operations on sets in the map tasks so likely to be allocating 
and freeing loads of pages) I eventually get kernel error 'python: page 
allocation failure: order:10, mode:0x2000d0' plus very verbose dump which I can 
reduce to following snippet:
Node 1 Normal: 3601*4kB (UEM) 3159*8kB (UEM) 1669*16kB (UEM) 763*32kB (UEM) 
1451*64kB (UEM) 15*128kB (UM) 1*256kB (U) 0*512kB 0*1024kB 0*2048kB 0*4096kB = 
185836kB
...SLAB: Unable to allocate memory on node 1 (gfp=0xd0)
cache: size-4194304, object size: 4194304, order: 10
so simply the memory got fragmented and there are no higher order pages. 
interesting thing is that there is no error thrown by spark itself - the 
processing just gets stuck without any error or anything (only the kernel dmesg 
explains what happened in the background).
any kernel experts out there with an advice how to avoid this? have tried few 
vm options but still no joy.
running spark 1.2.0 (cdh 5.3.0) on kernel 3.8.13
thanks,Antony. 

java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-01-27 Thread Antony Mayi
Hi,
I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors 
crashed with this error.
does that mean I have genuinely not enough RAM or is this matter of config 
tuning?
other config options used:spark.storage.memoryFraction=0.3
SPARK_EXECUTOR_MEMORY=14G
running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS 
trainImplicit on ~15GB dataset)
thanks for any ideas,Antony.

Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-01-27 Thread Antony Mayi
)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)  
      at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)  
      at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127)      
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)        at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)        at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)        
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)        at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:230)        at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
 

 On Wednesday, 28 January 2015, 0:01, Guru Medasani gdm...@outlook.com 
wrote:
   
 

 Can you attach the logs where this is failing?
From:  Sven Krasser kras...@gmail.com
Date:  Tuesday, January 27, 2015 at 4:50 PM
To:  Guru Medasani gdm...@outlook.com
Cc:  Sandy Ryza sandy.r...@cloudera.com, Antony Mayi antonym...@yahoo.com, 
user@spark.apache.org user@spark.apache.org
Subject:  Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Since it's an executor running OOM it doesn't look like a container being 
killed by YARN to me. As a starting point, can you repartition your job into 
smaller tasks?
-Sven

On Tue, Jan 27, 2015 at 2:34 PM, Guru Medasani gdm...@outlook.com wrote:

Hi Anthony,
What is the setting of the total amount of memory in MB that can be allocated 
to containers on your NodeManagers?
yarn.nodemanager.resource.memory-mb
Can you check this above configuration in yarn-site.xml used by the node 
manager process?
-Guru Medasani
From:  Sandy Ryza sandy.r...@cloudera.com
Date:  Tuesday, January 27, 2015 at 3:33 PM
To:  Antony Mayi antonym...@yahoo.com
Cc:  user@spark.apache.org user@spark.apache.org
Subject:  Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

Hi Antony,
If you look in the YARN NodeManager logs, do you see that it's killing the 
executors?  Or are they crashing for a different reason?
-Sandy
On Tue, Jan 27, 2015 at 12:43 PM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
I am using spark.yarn.executor.memoryOverhead=8192 yet getting executors 
crashed with this error.
does that mean I have genuinely not enough RAM or is this matter of config 
tuning?
other config options used:spark.storage.memoryFraction=0.3
SPARK_EXECUTOR_MEMORY=14G
running spark 1.2.0 as yarn-client on cluster of 10 nodes (the workload is ALS 
trainImplicit on ~15GB dataset)
thanks for any ideas,Antony.





-- 
http://sites.google.com/site/krasser/?utm_source=sig


 
   

HW imbalance

2015-01-26 Thread Antony Mayi
Hi,
is it possible to mix hosts with (significantly) different specs within a 
cluster (without wasting the extra resources)? for example having 10 nodes with 
36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there a way to 
utilize the extra memory by spark executors (as my understanding is all spark 
executors must have same memory).
thanks,Antony.

Re: HW imbalance

2015-01-26 Thread Antony Mayi
should have said I am running as yarn-client. all I can see is specifying the 
generic executor memory that is then to be used in all containers. 

 On Monday, 26 January 2015, 16:48, Charles Feduke 
charles.fed...@gmail.com wrote:
   
 

 You should look at using Mesos. This should abstract away the individual hosts 
into a pool of resources and make the different physical specifications 
manageable.

I haven't tried configuring Spark Standalone mode to have different specs on 
different machines but based on spark-env.sh.template:
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine# - 
SPARK_WORKER_MEMORY, to set how much total memory workers have to give 
executors (e.g. 1000m, 2g)# - SPARK_WORKER_OPTS, to set config properties only 
for the worker (e.g. -Dx=y)
it looks like you should be able to mix. (Its not clear to me whether 
SPARK_WORKER_MEMORY is uniform across the cluster or for the machine where the 
config file resides.)

On Mon Jan 26 2015 at 8:07:51 AM Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
is it possible to mix hosts with (significantly) different specs within a 
cluster (without wasting the extra resources)? for example having 10 nodes with 
36GB RAM/10CPUs now trying to add 3 hosts with 128GB/10CPUs - is there a way to 
utilize the extra memory by spark executors (as my understanding is all spark 
executors must have same memory).
thanks,Antony.


 


Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-17 Thread Antony Mayi
the values are for sure applied as expected - confirmed using the spark UI 
environment page...
it comes from my defaults configured using 
'spark.yarn.executor.memoryOverhead=8192' (yes, now increased even more) in 
/etc/spark/conf/spark-defaults.conf and 'export SPARK_EXECUTOR_MEMORY=24G' in 
/etc/spark/conf/spark-env.sh
Antony. 

 On Saturday, 17 January 2015, 11:32, Sean Owen so...@cloudera.com wrote:
   
 

 I'm not sure how you are setting these values though. Where is
spark.yarn.executor.memoryOverhead=6144 ? Env variables aren't the
best way to set configuration either. Again have a look at
http://spark.apache.org/docs/latest/running-on-yarn.html

... --executor-memory 22g --conf
spark.yarn.executor.memoryOverhead=2g ... should do it, off the top
of my head. That should reserve 24g from YARN.

On Sat, Jan 17, 2015 at 5:29 AM, Antony Mayi antonym...@yahoo.com wrote:
 although this helped to improve it significantly I still run into this
 problem despite increasing the spark.yarn.executor.memoryOverhead vastly:

 export SPARK_EXECUTOR_MEMORY=24G
 spark.yarn.executor.memoryOverhead=6144

 yet getting this:
 2015-01-17 04:47:40,389 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=30211,containerID=container_1421451766649_0002_01_115969] is
 running beyond physical memory limits. Current usage: 30.1 GB of 30 GB
 physical memory used; 33.0 GB of 63.0 GB virtual memory used. Killing
 container.

 is there anything more I can do?

 thanks,
 Antony.


 On Monday, 12 January 2015, 8:21, Antony Mayi antonym...@yahoo.com wrote:



 this seems to have sorted it, awesome, thanks for great help.
 Antony.


 On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote:



 I would expect the size of the user/item feature RDDs to grow linearly
 with the rank, of course. They are cached, so that would drive cache
 memory usage on the cluster.

 This wouldn't cause executors to fail for running out of memory
 though. In fact, your error does not show the task failing for lack of
 memory. What it shows is that YARN thinks the task is using a little
 bit more memory than it said it would, and killed it.

 This happens sometimes with JVM-based YARN jobs since a JVM configured
 to use X heap ends up using a bit more than X physical memory if the
 heap reaches max size. So there's a bit of headroom built in and
 controlled by spark.yarn.executor.memoryOverhead
 (http://spark.apache.org/docs/latest/running-on-yarn.html) You can try
 increasing it to a couple GB.


 On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi
 antonym...@yahoo.com.invalid wrote:
 the question really is whether this is expected that the memory
 requirements
 grow rapidly with the rank... as I would expect memory is rather O(1)
 problem with dependency only on the size of input data.

 if this is expected is there any rough formula to determine the required
 memory based on ALS input and parameters?

 thanks,
 Antony.


 On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com
 wrote:



 the actual case looks like this:
 * spark 1.1.0 on yarn (cdh 5.2.1)
 * ~8-10 executors, 36GB phys RAM per host
 * input RDD is roughly 3GB containing ~150-200M items (and this RDD is
 made
 persistent using .cache())
 * using pyspark

 yarn is configured with the limit yarn.nodemanager.resource.memory-mb of
 33792 (33GB), spark is set to be:
 SPARK_EXECUTOR_CORES=6
 SPARK_EXECUTOR_INSTANCES=9
 SPARK_EXECUTOR_MEMORY=30G

 when using higher rank (above 20) for ALS.trainImplicit the executor runs
 after some time (~hour) of execution out of the yarn limit and gets
 killed:

 2015-01-09 17:51:27,130 WARN

 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=27125,containerID=container_1420871936411_0002_01_23]
 is
 running beyond physical memory limits. Current usage: 31.2 GB of 31 GB
 physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing
 container.

 thanks for any ideas,
 Antony.



 On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com
 wrote:



 the memory requirements seem to be rapidly growing hen using higher
 rank...
 I am unable to get over 20 without running out of memory. is this
 expected?
 thanks, Antony.







 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



 
   

remote Akka client disassociated - some timeout?

2015-01-16 Thread Antony Mayi
Hi,
I believe this is some kind of timeout problem but can't figure out how to 
increase it.
I am running spark 1.2.0 on yarn (all from cdh 5.3.0). I submit a python task 
which first loads big RDD from hbase - I can see in the screen output all 
executors fire up then no more logging output for next two minutes after which 
I get plenty of
15/01/16 17:35:16 ERROR cluster.YarnClientClusterScheduler: Lost executor 7 on 
node01: remote Akka client disassociated15/01/16 17:35:16 INFO 
scheduler.TaskSetManager: Re-queueing tasks for 7 from TaskSet 1.015/01/16 
17:35:16 WARN scheduler.TaskSetManager: Lost task 32.0 in stage 1.0 (TID 17, 
node01): ExecutorLostFailure (executor 7 lost)15/01/16 17:35:16 WARN 
scheduler.TaskSetManager: Lost task 34.0 in stage 1.0 (TID 25, node01): 
ExecutorLostFailure (executor 7 lost)
this points to some timeout ~120secs while the nodes are loading the big RDD? 
any ideas how to get around it?
fyi I already use following options without any success:
    spark.core.connection.ack.wait.timeout: 600    spark.akka.timeout: 1000

thanks,Antony.



Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-16 Thread Antony Mayi
although this helped to improve it significantly I still run into this problem 
despite increasing the spark.yarn.executor.memoryOverhead vastly:
export SPARK_EXECUTOR_MEMORY=24Gspark.yarn.executor.memoryOverhead=6144

yet getting this:2015-01-17 04:47:40,389 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=30211,containerID=container_1421451766649_0002_01_115969] is 
running beyond physical memory limits. Current usage: 30.1 GB of 30 GB physical 
memory used; 33.0 GB of 63.0 GB virtual memory used. Killing container.
is there anything more I can do?
thanks,Antony. 

 On Monday, 12 January 2015, 8:21, Antony Mayi antonym...@yahoo.com wrote:
   
 

 this seems to have sorted it, awesome, thanks for great help.Antony. 

 On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote:
   
 

 I would expect the size of the user/item feature RDDs to grow linearly
with the rank, of course. They are cached, so that would drive cache
memory usage on the cluster.

This wouldn't cause executors to fail for running out of memory
though. In fact, your error does not show the task failing for lack of
memory. What it shows is that YARN thinks the task is using a little
bit more memory than it said it would, and killed it.

This happens sometimes with JVM-based YARN jobs since a JVM configured
to use X heap ends up using a bit more than X physical memory if the
heap reaches max size. So there's a bit of headroom built in and
controlled by spark.yarn.executor.memoryOverhead
(http://spark.apache.org/docs/latest/running-on-yarn.html) You can try
increasing it to a couple GB.


On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 the question really is whether this is expected that the memory requirements
 grow rapidly with the rank... as I would expect memory is rather O(1)
 problem with dependency only on the size of input data.

 if this is expected is there any rough formula to determine the required
 memory based on ALS input and parameters?

 thanks,
 Antony.


 On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com
 wrote:



 the actual case looks like this:
 * spark 1.1.0 on yarn (cdh 5.2.1)
 * ~8-10 executors, 36GB phys RAM per host
 * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made
 persistent using .cache())
 * using pyspark

 yarn is configured with the limit yarn.nodemanager.resource.memory-mb of
 33792 (33GB), spark is set to be:
 SPARK_EXECUTOR_CORES=6
 SPARK_EXECUTOR_INSTANCES=9
 SPARK_EXECUTOR_MEMORY=30G

 when using higher rank (above 20) for ALS.trainImplicit the executor runs
 after some time (~hour) of execution out of the yarn limit and gets killed:

 2015-01-09 17:51:27,130 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=27125,containerID=container_1420871936411_0002_01_23] is
 running beyond physical memory limits. Current usage: 31.2 GB of 31 GB
 physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing
 container.

 thanks for any ideas,
 Antony.



 On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com
 wrote:



 the memory requirements seem to be rapidly growing hen using higher rank...
 I am unable to get over 20 without running out of memory. is this expected?
 thanks, Antony.






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



 


 
   

java.io.IOException: sendMessageReliably failed without being ACK'd

2015-01-14 Thread Antony Mayi
Hi,
running spark 1.1.0 in yarn-client mode (cdh 5.2.1) on XEN based cloud and 
randomly getting my executors failing on errors like bellow. I suspect it is 
some cloud networking issue (XEN driver bug?) but wondering if there is any 
spark/yarn workaround that I could use to mitigate?
Thanks,Antony
15/01/14 10:36:44 ERROR storage.BlockFetcherIterator$BasicBlockFetcherIterator: 
Could not get block(s) from ConnectionManagerId(node02,40868)
java.io.IOException: sendMessageReliably failed without being ACK'd        at 
org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:865)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:861)
        at 
org.apache.spark.network.ConnectionManager$MessageStatus.markDone(ConnectionManager.scala:66)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:457)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:455)
        at scala.collection.immutable.List.foreach(List.scala:318)        at 
org.apache.spark.network.ConnectionManager.removeConnection(ConnectionManager.scala:455)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434)
        at 
org.apache.spark.network.Connection.callOnCloseCallback(Connection.scala:156)   
     at org.apache.spark.network.Connection.close(Connection.scala:128)        
at 
org.apache.spark.network.ConnectionManager.removeConnection(ConnectionManager.scala:476)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434)
        at 
org.apache.spark.network.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:434)
        at 
org.apache.spark.network.Connection.callOnCloseCallback(Connection.scala:156)   
     at org.apache.spark.network.Connection.close(Connection.scala:128)        
at org.apache.spark.network.ReceivingConnection.read(Connection.scala:491)      
  at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) 
       at java.lang.Thread.run(Thread.java:662)



Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-11 Thread Antony Mayi
the question really is whether this is expected that the memory requirements 
grow rapidly with the rank... as I would expect memory is rather O(1) problem 
with dependency only on the size of input data.
if this is expected is there any rough formula to determine the required memory 
based on ALS input and parameters?
thanks,Antony. 

 On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 the actual case looks like this:* spark 1.1.0 on yarn (cdh 5.2.1)* ~8-10 
executors, 36GB phys RAM per host* input RDD is roughly 3GB containing 
~150-200M items (and this RDD is made persistent using .cache())* using pyspark
yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 
(33GB), spark is set to 
be:SPARK_EXECUTOR_CORES=6SPARK_EXECUTOR_INSTANCES=9SPARK_EXECUTOR_MEMORY=30G
when using higher rank (above 20) for ALS.trainImplicit the executor runs after 
some time (~hour) of execution out of the yarn limit and gets killed:
2015-01-09 17:51:27,130 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=27125,containerID=container_1420871936411_0002_01_23] is 
running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical 
memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container.
thanks for any ideas,Antony.
 

 On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 the memory requirements seem to be rapidly growing hen using higher rank... I 
am unable to get over 20 without running out of memory. is this 
expected?thanks, Antony. 

 


 
   

Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-11 Thread Antony Mayi
this seems to have sorted it, awesome, thanks for great help.Antony. 

 On Sunday, 11 January 2015, 13:02, Sean Owen so...@cloudera.com wrote:
   
 

 I would expect the size of the user/item feature RDDs to grow linearly
with the rank, of course. They are cached, so that would drive cache
memory usage on the cluster.

This wouldn't cause executors to fail for running out of memory
though. In fact, your error does not show the task failing for lack of
memory. What it shows is that YARN thinks the task is using a little
bit more memory than it said it would, and killed it.

This happens sometimes with JVM-based YARN jobs since a JVM configured
to use X heap ends up using a bit more than X physical memory if the
heap reaches max size. So there's a bit of headroom built in and
controlled by spark.yarn.executor.memoryOverhead
(http://spark.apache.org/docs/latest/running-on-yarn.html) You can try
increasing it to a couple GB.


On Sun, Jan 11, 2015 at 9:43 AM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 the question really is whether this is expected that the memory requirements
 grow rapidly with the rank... as I would expect memory is rather O(1)
 problem with dependency only on the size of input data.

 if this is expected is there any rough formula to determine the required
 memory based on ALS input and parameters?

 thanks,
 Antony.


 On Saturday, 10 January 2015, 10:47, Antony Mayi antonym...@yahoo.com
 wrote:



 the actual case looks like this:
 * spark 1.1.0 on yarn (cdh 5.2.1)
 * ~8-10 executors, 36GB phys RAM per host
 * input RDD is roughly 3GB containing ~150-200M items (and this RDD is made
 persistent using .cache())
 * using pyspark

 yarn is configured with the limit yarn.nodemanager.resource.memory-mb of
 33792 (33GB), spark is set to be:
 SPARK_EXECUTOR_CORES=6
 SPARK_EXECUTOR_INSTANCES=9
 SPARK_EXECUTOR_MEMORY=30G

 when using higher rank (above 20) for ALS.trainImplicit the executor runs
 after some time (~hour) of execution out of the yarn limit and gets killed:

 2015-01-09 17:51:27,130 WARN
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=27125,containerID=container_1420871936411_0002_01_23] is
 running beyond physical memory limits. Current usage: 31.2 GB of 31 GB
 physical memory used; 34.7 GB of 65.1 GB virtual memory used. Killing
 container.

 thanks for any ideas,
 Antony.



 On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com
 wrote:



 the memory requirements seem to be rapidly growing hen using higher rank...
 I am unable to get over 20 without running out of memory. is this expected?
 thanks, Antony.






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



 


ALS.trainImplicit running out of mem when using higher rank

2015-01-10 Thread Antony Mayi
the memory requirements seem to be rapidly growing hen using higher rank... I 
am unable to get over 20 without running out of memory. is this 
expected?thanks, Antony. 

Re: ALS.trainImplicit running out of mem when using higher rank

2015-01-10 Thread Antony Mayi
the actual case looks like this:* spark 1.1.0 on yarn (cdh 5.2.1)* ~8-10 
executors, 36GB phys RAM per host* input RDD is roughly 3GB containing 
~150-200M items (and this RDD is made persistent using .cache())* using pyspark
yarn is configured with the limit yarn.nodemanager.resource.memory-mb of 33792 
(33GB), spark is set to 
be:SPARK_EXECUTOR_CORES=6SPARK_EXECUTOR_INSTANCES=9SPARK_EXECUTOR_MEMORY=30G
when using higher rank (above 20) for ALS.trainImplicit the executor runs after 
some time (~hour) of execution out of the yarn limit and gets killed:
2015-01-09 17:51:27,130 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=27125,containerID=container_1420871936411_0002_01_23] is 
running beyond physical memory limits. Current usage: 31.2 GB of 31 GB physical 
memory used; 34.7 GB of 65.1 GB virtual memory used. Killing container.
thanks for any ideas,Antony.
 

 On Saturday, 10 January 2015, 10:11, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 the memory requirements seem to be rapidly growing hen using higher rank... I 
am unable to get over 20 without running out of memory. is this 
expected?thanks, Antony. 

 
   

spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 

Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
thanks, I found the issue, I was including 
/usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into 
the classpath - this was breaking it. now using custom jar with just the python 
convertors and all works as a charm.thanks,antony. 

 On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com wrote:
   
 

 Yes, the distribution is certainly fine and built for Hadoop 2. It sounds like 
you are inadvertently including Spark code compiled for Hadoop 1 when you run 
your app. The general idea is to use the cluster's copy at runtime. Those with 
more pyspark experience might be able to give more useful directions about how 
to fix that.
On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote:

this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by 
me and I presume they are pretty good in building it so I still suspect it now 
gets the classpath resolved in different way?
thx,Antony. 

 On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote:
   
 

 Problems like this are always due to having code compiled for Hadoop 1.x run 
against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime 
Hadoop 2.x is used.
A common cause is actually bundling Spark / Hadoop classes with your app, when 
the app should just use the Spark / Hadoop provided by the cluster. It could 
also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster.
On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 



 




 
   

Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Antony Mayi
this is official cloudera compiled stack cdh 5.3.0 - nothing has been done by 
me and I presume they are pretty good in building it so I still suspect it now 
gets the classpath resolved in different way?
thx,Antony. 

 On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com wrote:
   
 

 Problems like this are always due to having code compiled for Hadoop 1.x run 
against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at runtime 
Hadoop 2.x is used.
A common cause is actually bundling Spark / Hadoop classes with your app, when 
the app should just use the Spark / Hadoop provided by the cluster. It could 
also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x cluster.
On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running as 
yarn-client) - pretty much the standard case demonstrated in the 
hbase_inputformat.py from examples... the thing is the when trying the very 
same code on spark 1.2 I am getting the error bellow which based on similar 
cases on another forums suggest incompatibility between MR1 and MR2.
why would this now start happening? is that due to some changes in resolving 
the classpath which now picks up MR2 jars first while before it was MR1?
is there any workaround for this?
thanks,Antony.
the error:
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.: 
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.hadoop.mapreduce.JobContext, but class was expected at 
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at 
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at 
scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at 
org.apache.spark.rdd.RDD.take(RDD.scala:1060) at 
org.apache.spark.rdd.RDD.first(RDD.scala:1093) at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202) at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500) at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at 
py4j.Gateway.invoke(Gateway.java:259) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:207) at 
java.lang.Thread.run(Thread.java:745)
 



 
   

pyspark executor PYTHONPATH

2015-01-02 Thread Antony Mayi
Hi,
I am running spark 1.1.0 on yarn. I have custom set of modules installed under 
same location on each executor node and wondering how can I pass the executors 
the PYTHONPATH so that they can use the modules.
I've tried this:
spark-env.sh:export PYTHONPATH=/tmp/test/

spark-defaults.conf:spark.executorEnv.PYTHONPATH=/tmp/test/


/tmp/test/pkg:__init__.pymod.py:  def test(x):
      return x
from the pyspark shell I can import the module pkg.mod without any issues:
$$$ import pkg.mod$$$ print pkg.mod.test(1)1
also the path is correctly set:
$$$ print 
os.environ['PYTHONPATH']/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/:/tmp/test/
$$$ print sys.path['', '/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip', 
'/usr/lib/spark/python', '/tmp/test/', ... ]

it even is seen by the executors:
$$$ sc.parallelize(range(4)).map(lambda x: 
os.environ['PYTHONPATH']).collect()['/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u02/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/']
yet it fails when actually using the module on the executor:$$$ 
sc.parallelize(range(4)).map(pkg.mod.test).collect()...ImportError: No module 
named mod...
any idea how to achieve this? don't want to use the sc.addPyFile as this is big 
packages and they are installed everywhere anyway...
thank you,Antony.




Re: pyspark executor PYTHONPATH

2015-01-02 Thread Antony Mayi
ok, I see now what's happening - the pkg.mod.test is serialized by reference 
and there is nothing actually trying to import pkg.mod on the executors so the 
reference is broken.
so how can I get the pkg.mod imported on the executors?
thanks,Antony. 

 On Friday, 2 January 2015, 13:49, Antony Mayi antonym...@yahoo.com wrote:
   
 

 Hi,
I am running spark 1.1.0 on yarn. I have custom set of modules installed under 
same location on each executor node and wondering how can I pass the executors 
the PYTHONPATH so that they can use the modules.
I've tried this:
spark-env.sh:export PYTHONPATH=/tmp/test/

spark-defaults.conf:spark.executorEnv.PYTHONPATH=/tmp/test/


/tmp/test/pkg:__init__.pymod.py:  def test(x):
      return x
from the pyspark shell I can import the module pkg.mod without any issues:
$$$ import pkg.mod$$$ print pkg.mod.test(1)1
also the path is correctly set:
$$$ print 
os.environ['PYTHONPATH']/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip:/usr/lib/spark/python/:/tmp/test/
$$$ print sys.path['', '/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip', 
'/usr/lib/spark/python', '/tmp/test/', ... ]

it even is seen by the executors:
$$$ sc.parallelize(range(4)).map(lambda x: 
os.environ['PYTHONPATH']).collect()['/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u01/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/',
 
'/u02/yarn/local/usercache/user/filecache/24/spark-assembly-1.1.0-cdh5.2.1-hadoop2.5.0-cdh5.2.1.jar:/tmp/test/:/tmp/test/']
yet it fails when actually using the module on the executor:$$$ 
sc.parallelize(range(4)).map(pkg.mod.test).collect()...ImportError: No module 
named mod...
any idea how to achieve this? don't want to use the sc.addPyFile as this is big 
packages and they are installed everywhere anyway...
thank you,Antony.




 
   

saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0

2014-12-24 Thread Antony Mayi
Hi,
have been using this without any issues with spark 1.1.0 but after upgrading to 
1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just 
hangs - even when testing with the example from the stock hbase_outputformat.py.
anyone having same issue? (and able to solve?)
using hbase 0.98.6 and yarn-client mode.
thanks,Antony.


Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0

2014-12-24 Thread Antony Mayi
this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK
thanks, Antony. 

 On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. even when testing with the example from the stock hbase_outputformat.py
Can you take jstack of the above and pastebin it ?
Thanks
On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
have been using this without any issues with spark 1.1.0 but after upgrading to 
1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just 
hangs - even when testing with the example from the stock hbase_outputformat.py.
anyone having same issue? (and able to solve?)
using hbase 0.98.6 and yarn-client mode.
thanks,Antony.




 
   

Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0

2014-12-24 Thread Antony Mayi
I just run it by hand from pyspark shell. here is the steps:
pyspark --jars 
/usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar
 conf = {hbase.zookeeper.quorum: localhost,
...         hbase.mapred.outputtable: test,...         
mapreduce.outputformat.class: 
org.apache.hadoop.hbase.mapreduce.TableOutputFormat,...         
mapreduce.job.output.key.class: 
org.apache.hadoop.hbase.io.ImmutableBytesWritable,...         
mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv 
= 
org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter
 valueConv = 
org.apache.spark.examples.pythonconverters.StringListToPutConverter 
sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: 
(x[0], x)).saveAsNewAPIHadoopDataset(...         conf=conf,...         
keyConverter=keyConv,...         valueConverter=valueConv)
then it spills few of the INFO level messages about submitting a task etc but 
then it just hangs. very same code runs ok on spark 1.1.0 - the records gets 
stored in hbase.
thanks,Antony.

 

 On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote:
   
 

 I went over the jstack but didn't find any call related to hbase or 
zookeeper.Do you find anything important in the logs ?
Looks like container launcher was waiting for the script to return some result:
   
   -         at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715)
   -         at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)

On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote:

this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK
thanks, Antony. 

 On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. even when testing with the example from the stock hbase_outputformat.py
Can you take jstack of the above and pastebin it ?
Thanks
On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
have been using this without any issues with spark 1.1.0 but after upgrading to 
1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just 
hangs - even when testing with the example from the stock hbase_outputformat.py.
anyone having same issue? (and able to solve?)
using hbase 0.98.6 and yarn-client mode.
thanks,Antony.




 




 
   

Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0

2014-12-24 Thread Antony Mayi
I am running it in yarn-client mode and I believe hbase-client is part of the 
spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar which I am submitting at 
launch.
adding another jstack taken during the hanging - http://pastebin.com/QDQrBw70 - 
this is of the CoarseGrainedExecutorBackend process - this one is referencing 
hbase and zookeeper.
thanks,Antony. 

 On Thursday, 25 December 2014, 1:38, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. hbase.zookeeper.quorum: localhost
You are running hbase cluster in standalone mode ?Is hbase-client jar in the 
classpath ?
Cheers
On Wed, Dec 24, 2014 at 4:11 PM, Antony Mayi antonym...@yahoo.com wrote:

I just run it by hand from pyspark shell. here is the steps:
pyspark --jars 
/usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar
 conf = {hbase.zookeeper.quorum: localhost,
...         hbase.mapred.outputtable: test,...         
mapreduce.outputformat.class: 
org.apache.hadoop.hbase.mapreduce.TableOutputFormat,...         
mapreduce.job.output.key.class: 
org.apache.hadoop.hbase.io.ImmutableBytesWritable,...         
mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv 
= 
org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter
 valueConv = 
org.apache.spark.examples.pythonconverters.StringListToPutConverter 
sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: 
(x[0], x)).saveAsNewAPIHadoopDataset(...         conf=conf,...         
keyConverter=keyConv,...         valueConverter=valueConv)
then it spills few of the INFO level messages about submitting a task etc but 
then it just hangs. very same code runs ok on spark 1.1.0 - the records gets 
stored in hbase.
thanks,Antony.

 

 On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote:
   
 

 I went over the jstack but didn't find any call related to hbase or 
zookeeper.Do you find anything important in the logs ?
Looks like container launcher was waiting for the script to return some result:
   
   -         at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715)
   -         at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)

On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote:

this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK
thanks, Antony. 

 On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. even when testing with the example from the stock hbase_outputformat.py
Can you take jstack of the above and pastebin it ?
Thanks
On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
have been using this without any issues with spark 1.1.0 but after upgrading to 
1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just 
hangs - even when testing with the example from the stock hbase_outputformat.py.
anyone having same issue? (and able to solve?)
using hbase 0.98.6 and yarn-client mode.
thanks,Antony.




 




 




 
   

Re: saveAsNewAPIHadoopDataset against hbase hanging in pyspark 1.2.0

2014-12-24 Thread Antony Mayi
also hbase itself works ok:
hbase(main):006:0 scan 'test'ROW                            COLUMN+CELL        
                                                                     key1       
                   column=f1:asd, timestamp=1419463092904, value=456            
                          1 row(s) in 0.0250 seconds
hbase(main):007:0 put 'test', 'testkey', 'f1:testqual', 'testval'0 row(s) in 
0.0170 seconds
hbase(main):008:0 scan 'test'ROW                            COLUMN+CELL        
                                                                     key1       
                   column=f1:asd, timestamp=1419463092904, value=456            
                           testkey                       column=f1:testqual, 
timestamp=1419487275905, value=testval                             2 row(s) in 
0.0270 seconds
 

 On Thursday, 25 December 2014, 6:58, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 I am running it in yarn-client mode and I believe hbase-client is part of the 
spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar which I am submitting at 
launch.
adding another jstack taken during the hanging - http://pastebin.com/QDQrBw70 - 
this is of the CoarseGrainedExecutorBackend process - this one is referencing 
hbase and zookeeper.
thanks,Antony. 

 On Thursday, 25 December 2014, 1:38, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. hbase.zookeeper.quorum: localhost
You are running hbase cluster in standalone mode ?Is hbase-client jar in the 
classpath ?
Cheers
On Wed, Dec 24, 2014 at 4:11 PM, Antony Mayi antonym...@yahoo.com wrote:

I just run it by hand from pyspark shell. here is the steps:
pyspark --jars 
/usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar
 conf = {hbase.zookeeper.quorum: localhost,
...         hbase.mapred.outputtable: test,...         
mapreduce.outputformat.class: 
org.apache.hadoop.hbase.mapreduce.TableOutputFormat,...         
mapreduce.job.output.key.class: 
org.apache.hadoop.hbase.io.ImmutableBytesWritable,...         
mapreduce.job.output.value.class: org.apache.hadoop.io.Writable} keyConv 
= 
org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter
 valueConv = 
org.apache.spark.examples.pythonconverters.StringListToPutConverter 
sc.parallelize([['testkey', 'f1', 'testqual', 'testval']], 1).map(lambda x: 
(x[0], x)).saveAsNewAPIHadoopDataset(...         conf=conf,...         
keyConverter=keyConv,...         valueConverter=valueConv)
then it spills few of the INFO level messages about submitting a task etc but 
then it just hangs. very same code runs ok on spark 1.1.0 - the records gets 
stored in hbase.
thanks,Antony.

 

 On Thursday, 25 December 2014, 0:37, Ted Yu yuzhih...@gmail.com wrote:
   
 

 I went over the jstack but didn't find any call related to hbase or 
zookeeper.Do you find anything important in the logs ?
Looks like container launcher was waiting for the script to return some result:
   
   -         at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.parseExecResult(Shell.java:715)
   -         at org.apache.hadoop.util.Shell.runCommand(Shell.java:524)

On Wed, Dec 24, 2014 at 3:11 PM, Antony Mayi antonym...@yahoo.com wrote:

this is it (jstack of particular yarn container) - http://pastebin.com/eAdiUYKK
thanks, Antony. 

 On Wednesday, 24 December 2014, 16:34, Ted Yu yuzhih...@gmail.com wrote:
   
 

 bq. even when testing with the example from the stock hbase_outputformat.py
Can you take jstack of the above and pastebin it ?
Thanks
On Wed, Dec 24, 2014 at 4:49 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,
have been using this without any issues with spark 1.1.0 but after upgrading to 
1.2.0 saving a RDD from pyspark using saveAsNewAPIHadoopDataset into HBase just 
hangs - even when testing with the example from the stock hbase_outputformat.py.
anyone having same issue? (and able to solve?)
using hbase 0.98.6 and yarn-client mode.
thanks,Antony.




 




 




 


 
   

Re: custom python converter from HBase Result to tuple

2014-12-22 Thread Antony Mayi
using hbase 0.98.6
there is no stack trace, just this short error.
just noticed it does the fallback to toString as in the message as this is what 
I get back to python:

hbase_rdd.collect()
[(u'key1', u'List(cf1:12345:14567890, cf2:123:14567896)')]
so the question is why it falls back to toString?
thanks,Antony.
 

 On Monday, 22 December 2014, 20:09, Ted Yu yuzhih...@gmail.com wrote:
   
 

 Which HBase version are you using ?
Can you show the full stack trace ?
Cheers
On Mon, Dec 22, 2014 at 11:02 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

Hi,

can anyone please give me some help how to write custom converter of hbase data 
to (for example) tuples of ((family, qualifier, value), ) for pyspark:

I was trying something like (here trying to tuples of 
(family:qualifier:value, )):


class HBaseResultToTupleConverter extends Converter[Any, List[String]] {
  override def convert(obj: Any): List[String] = {
    val result = obj.asInstanceOf[Result]
    result.rawCells().map(cell = 
List(Bytes.toString(CellUtil.cloneFamily(cell)),
      Bytes.toString(CellUtil.cloneQualifier(cell)),
      Bytes.toString(CellUtil.cloneValue(cell))).mkString(:)
    ).toList
  }
}


but then I get a error:

14/12/22 16:27:40 WARN python.SerDeUtil:
Failed to pickle Java object as value: $colon$colon, falling back
to 'toString'. Error: couldn't introspect javabean: 
java.lang.IllegalArgumentException: wrong number of arguments


does anyone have a hint?

Thanks,
Antony.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





 
   

cartesian on pyspark not paralleised

2014-12-05 Thread Antony Mayi
Hi,

using pyspark 1.1.0 on YARN 2.5.0. all operations run nicely in parallel - I 
can seen multiple python processes spawned on each nodemanager but from some 
reason when running cartesian there is only single python process running on 
each node. the task is indicating thousands of partitions so don't understand 
why it is not running with higher parallelism. the performance is obviously 
poor although other operation rocks.

any idea how to improve this?

thank you,
Antony.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org