spark performance non-linear response

2015-10-07 Thread Yadid Ayzenberg

Hi All,

Im using spark 1.4.1 to to analyze a largish data set (several Gigabytes 
of data). The RDD is partitioned into 2048 partitions which are more or 
less equal and entirely cached in RAM.
I evaluated the performance on several cluster sizes, and am witnessing 
a non linear (power) performance improvement as the cluster size 
increases (plot below). Each node has 4 cores and each worker is 
configured to use 10GB or RAM.


Spark performance

I would expect a more linear response given the number of partitions and 
the fact that all of the data is cached.

Can anyone suggest what I should tweak in order to improve the performance?
Or perhaps provide an explanation as to the behavior Im witnessing?

Yadid


Re: spark performance non-linear response

2015-10-07 Thread Yadid Ayzenberg

Additional missing relevant information:

Im running a transformation, there are no Shuffles occurring and at the 
end im performing a lookup of 4 partitions on the driver.




On 10/7/15 11:26 AM, Yadid Ayzenberg wrote:

Hi All,

Im using spark 1.4.1 to to analyze a largish data set (several 
Gigabytes of data). The RDD is partitioned into 2048 partitions which 
are more or less equal and entirely cached in RAM.
I evaluated the performance on several cluster sizes, and am 
witnessing a non linear (power) performance improvement as the cluster 
size increases (plot below). Each node has 4 cores and each worker is 
configured to use 10GB or RAM.


Spark performance

I would expect a more linear response given the number of partitions 
and the fact that all of the data is cached.
Can anyone suggest what I should tweak in order to improve the 
performance?

Or perhaps provide an explanation as to the behavior Im witnessing?

Yadid




Re: spark 1.4.1 - LZFException

2015-09-03 Thread Yadid Ayzenberg

Hi Akhil,

No, it seems I have plenty of more disk space available on that node.
I look at the logs and one minute before that exception I am seeing the 
following exception.


15/09/03 12:51:39 ERROR TransportChannelHandler: Connection to 
/x.y.z.w:44892 has been quiet for 12 ms while there are outstanding 
requests. Assuming connection is dead; please adjust 
spark.network.timeout if this is wrong.
15/09/03 12:51:39 ERROR TransportResponseHandler: Still have 8 requests 
outstanding when connection from /18.85.28.197:44892 is closed
15/09/03 12:51:39 ERROR OneForOneBlockFetcher: Failed while starting 
block fetches

java.io.IOException: Connection from /x.y.z.w:44892 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at 
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at 
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

at java.lang.Thread.run(Thread.java:745)

You think that is related to the problem ?

Yadid

On 8/28/15 1:31 AM, Akhil Das wrote:
Is it filling up your disk space? Can you look a bit more in the 
executor logs to see whats going on


Thanks
Best Regards

On Sun, Aug 23, 2015 at 1:27 AM, Yadid Ayzenberg <ya...@media.mit.edu 
<mailto:ya...@media.mit.edu>> wrote:




Hi All,

We have a spark standalone cluster running 1.4.1 and we are
setting spark.io.compression.codec to lzf.
I have a long running interactive application which behaves as
normal, but after a few days I get the following exception in
multiple jobs. Any ideas on what could be causing this ?

Yadid



Job aborted due to stage failure: Task 27 in stage 286.0 failed 4 times, 
most recent failure: Lost task 27.3 in stage 286.0 (TID 516817, xx.yy.zz.ww): 
com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: 
Corrupt input data, block did not start with 2 byte signature ('ZV') followed 
by type byte, 2-byte length)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
at 
org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionItera

spark 1.4.1 - LZFException

2015-08-22 Thread Yadid Ayzenberg



Hi All,

We have a spark standalone cluster running 1.4.1 and we are setting 
spark.io.compression.codec to lzf.
I have a long running interactive application which behaves as normal, 
but after a few days I get the following exception in multiple jobs. Any 
ideas on what could be causing this ?


Yadid



Job aborted due to stage failure: Task 27 in stage 286.0 failed 4 times, most 
recent failure: Lost task 27.3 in stage 286.0 (TID 516817, xx.yy.zz.ww): 
com.esotericsoftware.kryo.KryoException: com.ning.compress.lzf.LZFException: 
Corrupt input data, block did not start with 2 byte signature ('ZV') followed 
by type byte, 2-byte length)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:182)
at 
org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:169)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:200)
at 
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:197)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.ning.compress.lzf.LZFException: Corrupt input data, block did 
not start with 2 byte signature ('ZV') followed by type byte, 2-byte length)
at 
com.ning.compress.lzf.ChunkDecoder._reportCorruptHeader(ChunkDecoder.java:267)
at 
com.ning.compress.lzf.impl.UnsafeChunkDecoder.decodeChunk(UnsafeChunkDecoder.java:55)
at 
com.ning.compress.lzf.LZFInputStream.readyBuffer(LZFInputStream.java:363)
at com.ning.compress.lzf.LZFInputStream.read(LZFInputStream.java:193)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
... 37 more





Re: Change delimiter when collecting SchemaRDD

2014-08-29 Thread yadid ayzenberg
Thanks Michael, that makes total sense.
It works perfectly.

Yadid


On Thu, Aug 28, 2014 at 9:19 PM, Michael Armbrust mich...@databricks.com
wrote:

 The comma is just the way the default toString works for Row objects.
  Since SchemaRDDs are also RDDs, you can do arbitrary transformations on
 the Row objects that are returned.

 For example, if you'd rather the delimiter was '|':

 sql(SELECT * FROM src).map(_.mkString(|)).collect()


 On Thu, Aug 28, 2014 at 7:58 AM, yadid ayzenberg ya...@media.mit.edu
 wrote:

 Hi All,

 Is there any way to change the delimiter from being a comma ?
 Some of the strings in my data contain commas as well, making it very
 difficult to parse the results.

 Yadid





Change delimiter when collecting SchemaRDD

2014-08-28 Thread yadid ayzenberg
Hi All,

Is there any way to change the delimiter from being a comma ?
Some of the strings in my data contain commas as well, making it very
difficult to parse the results.

Yadid


Losing Executors on cluster with RDDs of 100GB

2014-08-22 Thread Yadid Ayzenberg

Hi all,

I have a spark cluster of 30 machines, 16GB / 8 cores on each running in 
standalone mode. Previously my application was working well ( several 
RDDs the largest being around 50G).
When I started processing larger amounts of data (RDDs of 100G) my app 
is losing executors. Im currently just loading them from a database, 
rePartitioning and persisting to disk (with replication x2)
I have spark.executor.memory= 9G, memoryFraction = 0.5, 
spark.worker.timeout =120, spark.akka.askTimeout=30, 
spark.storage.blockManagerHeartBeatMs=3.
I haven't change the default of my worker memory so its at 512m (should 
this be larger) ?


I've been getting the following messages from my app:

 [error] o.a.s.s.TaskSchedulerImpl - Lost executor 3 on myserver1: 
worker lost
[error] o.a.s.s.TaskSchedulerImpl - Lost executor 13 on myserver2: 
Unknown executor exit code (137) (died from signal 9?)
[error] a.r.EndpointWriter - AssociationError 
[akka.tcp://spark@master:59406] - 
[akka.tcp://sparkExecutor@myserver2:32955]: Error [Association failed 
with [akka.tcp://sparkExecutor@myserver2:32955]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkexecu...@myserver2.com:32955]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: myserver2/198.18.102.160:32955

]
[error] a.r.EndpointWriter - AssociationError 
[akka.tcp://spark@master:59406] - [akka.tcp://spark@myserver1:53855]: 
Error [Association failed with [akka.tcp://spark@myserver1:53855]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://spark@myserver1:53855]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: myserver1/198.18.102.160:53855

]

The worker logs and executor logs do not contain errors. Any ideas what 
the problem is ?


Yadid

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: possible typos in spark 1.0 documentation

2014-05-31 Thread Yadid Ayzenberg

Yep, I just issued a pull request.

Yadid


On 5/31/14, 1:25 PM, Patrick Wendell wrote:

1. ctx is an instance of JavaSQLContext but the textFile method is called as
a member of ctx.
According to the API JavaSQLContext does not have such a member, so im
guessing this should be sc instead.

Yeah, I think you are correct.


2. In that same code example the object sqlCtx is referenced, but it is
never instantiated in the code.
should this be ctx?

Also correct.

I think it would be good to be consistent and always have ctx refer
to a JavaSparkContext and have sqlCtx refer to a JavaSQLContext.

Any interest in creating a pull request for this? We'd be happy to
accept the change.

- Patrick




possible typos in spark 1.0 documentation

2014-05-30 Thread Yadid Ayzenberg

Congrats on the new 1.0 release. Amazing work !

It looks like there may some typos in the latest 
http://spark.apache.org/docs/latest/sql-programming-guide.html


in the Running SQL on RDDs section when choosing the java example:

1. ctx is an instance of JavaSQLContext but the textFile method is 
called as a member of ctx.
According to the API JavaSQLContext does not have such a member, so im 
guessing this should be sc instead.


2. In that same code example the object sqlCtx is referenced, but it is 
never instantiated in the code.

should this be ctx?

Cheers,

Yadid



Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-04 Thread Yadid Ayzenberg
An additional option 4) Use SparkContext.addJar() and have the 
application ship your jar to all the nodes.



Yadid

On 5/4/14, 4:07 PM, DB Tsai wrote:
If you add the breeze dependency in your build.sbt project, it will 
not be available to all the workers.


There are couple options, 1) use sbt assembly to package breeze into 
your application jar. 2) manually copy breeze jar into all the nodes, 
and have them in the classpath. 3) spark 1.0 has breeze jar in the 
spark flat assembly jar, so you don't need to add breeze dependency 
yourself.



Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sun, May 4, 2014 at 4:07 AM, wxhsdp wxh...@gmail.com 
mailto:wxh...@gmail.com wrote:


Hi,
  i'am trying to use breeze linalg library for matrix operation in
my spark
code. i already add dependency
  on breeze in my build.sbt, and package my code sucessfully.

  when i run on local mode, sbt run local..., everything is ok

  but when turn to standalone mode, sbt run
spark://127.0.0.1:7077...,
error occurs

14/05/04 18:56:29 WARN scheduler.TaskSetManager: Loss was due to
java.lang.NoSuchMethodError
java.lang.NoSuchMethodError:

breeze.linalg.DenseMatrix$.implOpMulMatrix_DMD_DMD_eq_DMD()Lbreeze/linalg/operators/DenseMatrixMultiplyStuff$implOpMulMatrix_DMD_DMD_eq_DMD$;

  in my opinion, everything needed is packaged to the jar file,
isn't it?
  and does anyone used breeze before? is it good for matrix operation?



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.






Re: Strange lookup behavior. Possible bug?

2014-04-30 Thread Yadid Ayzenberg

Dear Sparkers,

Has anyone got any insight on this ? I am really stuck.

Yadid


On 4/28/14, 11:28 AM, Yadid Ayzenberg wrote:

Thanks for your answer.
I tried running on a single machine - master and worker on one host. I 
get exactly the same results.
Very little CPU activity on the machine in question. The web UI shows 
a single task and its state is RUNNING. it will remain so indefinitely.

I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and 
the behavior is the same.

The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is 
some string that can appear more than once.


The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough 
information to answer. JavaPairRDDString, Tuple2.lookup() works 
fine on a remote Spark cluster:


$ MASTER=spark://localhost:7077 bin/spark-shell
scala val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 
3).map(x = ((x%3).toString, (x, x%3

scala rdd.lookup(1)
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an 
executor. I guess it is likely possible, though it has not happened 
to me. I would recommend running on a single machine in the 
standalone setup. Start the master and worker on the same machine, 
run the application there too. This should eliminate network 
configuration problems.


If you still see the issue, I'd check whether the task has really 
completed. What do you see on the web UI? Is the executor using CPU?


Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu 
mailto:ya...@media.mit.edu wrote:


Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when
running in local mode. What could be the difference?

I would really appreciate any pointers, as currently the the job
just hangs.




On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with
someone:

I suspect this happens when the lookup returns more than one
value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is
successfull. However, when running a standalone cluster
with the exact same dataset - one of the tasks never ends
(constantly in RUNNING status).
When viewing the worker log, it seems that the task has
finished successfully:

14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0
locally
14/04/25 13:40:38 INFO Executor: Serialized size of
result for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2
directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs
indefinitely.

If I execute a count priot to the lookup - I get the
correct number which suggests that the cluster is
operating as expected.

The exact same scenario works with a different type of
key (Tuple2): JavaPairRDDTuple2, Tuple2.

Any ideas on how to debug this problem ?

Thanks,

Yadid










Re: Strange lookup behavior. Possible bug?

2014-04-28 Thread Yadid Ayzenberg

Thanks for your answer.
I tried running on a single machine - master and worker on one host. I 
get exactly the same results.
Very little CPU activity on the machine in question. The web UI shows a 
single task and its state is RUNNING. it will remain so indefinitely.

I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and the 
behavior is the same.

The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is 
some string that can appear more than once.


The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough 
information to answer. JavaPairRDDString, Tuple2.lookup() works fine 
on a remote Spark cluster:


$ MASTER=spark://localhost:7077 bin/spark-shell
scala val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 
3).map(x = ((x%3).toString, (x, x%3

scala rdd.lookup(1)
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an 
executor. I guess it is likely possible, though it has not happened to 
me. I would recommend running on a single machine in the standalone 
setup. Start the master and worker on the same machine, run the 
application there too. This should eliminate network configuration 
problems.


If you still see the issue, I'd check whether the task has really 
completed. What do you see on the web UI? Is the executor using CPU?


Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu 
mailto:ya...@media.mit.edu wrote:


Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when running
in local mode. What could be the difference?

I would really appreciate any pointers, as currently the the job
just hangs.




On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with
someone:

I suspect this happens when the lookup returns more than one
value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is successfull.
However, when running a standalone cluster with the exact
same dataset - one of the tasks never ends (constantly in
RUNNING status).
When viewing the worker log, it seems that the task has
finished successfully:

14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0
locally
14/04/25 13:40:38 INFO Executor: Serialized size of result
for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2
directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs
indefinitely.

If I execute a count priot to the lookup - I get the
correct number which suggests that the cluster is
operating as expected.

The exact same scenario works with a different type of key
(Tuple2): JavaPairRDDTuple2, Tuple2.

Any ideas on how to debug this problem ?

Thanks,

Yadid








Re: Strange lookup behavior. Possible bug?

2014-04-28 Thread Yadid Ayzenberg

Could this be related to the size of the lookup result ?

I tried to recreate a similar scenario on the spark shell which causes 
an exception:


scala val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 4, 
3).map(x = (  ( 0,52fb9b1a3004f07d1a87c8f3 ), 
Seq.fill(40)(Random.nextFloat)   ))   )
rdd: org.apache.spark.api.java.JavaPairRDD[(Int, String),Seq[Float]] = 
org.apache.spark.api.java.JavaPairRDD@1481cb6e


scala rdd.count()
res53: Long = 4

scala rdd.lookup((0,52fb9b1a3004f07d1a87c8f3))
org.apache.spark.SparkException: Job aborted: Task 39.0:2 failed 4 times 
(most recent failure: Exception failure: 
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 3, 
required: 4)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)

at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




On 4/28/14 11:28 AM, Yadid Ayzenberg wrote:

Thanks for your answer.
I tried running on a single machine - master and worker on one host. I 
get exactly the same results.
Very little CPU activity on the machine in question. The web UI shows 
a single task and its state is RUNNING. it will remain so indefinitely.

I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and 
the behavior is the same.

The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is 
some string that can appear more than once.


The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough 
information to answer. JavaPairRDDString, Tuple2.lookup() works 
fine on a remote Spark cluster:


$ MASTER=spark://localhost:7077 bin/spark-shell
scala val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 
3).map(x = ((x%3).toString, (x, x%3

scala rdd.lookup(1)
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an 
executor. I guess it is likely possible, though it has not happened 
to me. I would recommend running on a single machine in the 
standalone setup. Start the master and worker on the same machine, 
run the application there too. This should eliminate network 
configuration problems.


If you still see the issue, I'd check whether the task has really 
completed. What do you see on the web UI? Is the executor using CPU?


Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu 
mailto:ya...@media.mit.edu wrote:


Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when
running in local mode. What could be the difference?

I would really appreciate any pointers, as currently the the job
just hangs.




On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with
someone:

I suspect this happens when the lookup returns more than one
value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2

Re: Strange lookup behavior. Possible bug?

2014-04-27 Thread Yadid Ayzenberg

Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when running in 
local mode. What could be the difference?


I would really appreciate any pointers, as currently the the job just hangs.



On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with someone:

I suspect this happens when the lookup returns more than one value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is successfull. However, 
when running a standalone cluster with the exact same dataset - one 
of the tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 
10896794

14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDDTuple2, Tuple2.


Any ideas on how to debug this problem ?

Thanks,

Yadid







Strange lookup behavior. Possible bug?

2014-04-25 Thread Yadid Ayzenberg

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is successfull. However, when 
running a standalone cluster with the exact same dataset - one of the 
tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDDTuple2, Tuple2.


Any ideas on how to debug this problem ?

Thanks,

Yadid



Re: Strange lookup behavior. Possible bug?

2014-04-25 Thread Yadid Ayzenberg

Some additional information - maybe this rings a bell with someone:

I suspect this happens when the lookup returns more than one value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is successfull. However, 
when running a standalone cluster with the exact same dataset - one of 
the tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 
10896794

14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDDTuple2, Tuple2.


Any ideas on how to debug this problem ?

Thanks,

Yadid