Managing Dataset API Partitions - Spark 2.0

2016-09-07 Thread ANDREA SPINA
Hi everyone,
I'd test some algorithms with the Dataset API offered by Spark 2.0.0.

So I was wondering, *which is the best way for managing Dataset partitions?*

E.g. in the data reading phase, what I use to do is the following
*// RDD*
*// if I want to set a custom minimum number of partitions*
*val data = sc.textFile(inputPath, numPartitions)*

*// If I want to coalesce with a new shape my RDD at some point*
*sc.repartition(newNumPartitions)*

*// Dataset API*
*// Now with the Dataset API I'm calling directly the repartition method on
the dataset*
*spark.read.text(inputPath).repartition(newNumberOfPartition)*

So I'll be glad to know if there're *any new valuable about custom
partitioning dataset, either in the reading phase or at some point?*

Thank you so much.
Andrea
-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


java.lang.OutOfMemoryError Spark MLlib ALS matrix factorization

2016-09-01 Thread ANDREA SPINA
 9360, cloud-15, partition 375,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:55:20 INFO TaskSetManager: Finished task 91.0 in stage 12.0
(TID 9093) in 1356978 ms on cloud-15 (120/720)
16/09/01 00:56:07 INFO TaskSetManager: Starting task 384.0 in stage 12.0
(TID 9361, cloud-24, partition 384,PROCESS_LOCAL, 2268 bytes)
16/09/01 00:56:07 WARN TaskSetManager: Lost task 18.0 in stage 12.0 (TID
9027, cloud-24): FetchFailed(BlockManagerId(2, cloud-22, 40528),
shuffleId=22, mapId=671, reduceId=18, message=
org.apache.spark.shuffle.FetchFailedException:
java.io.FileNotFoundException:
/data/1/peel/spark/tmp/spark-6225354a-22f0-45dd-aff0-76051ad609ed/executor-d5fbc621-341c-4fc9-bedc-c292dc7f038a/blockmgr-c8b40f38-99a9-4060-823d-50b502bd9f91/25/shuffle_22_671_0.index
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

I'm running with* spark-1.6.2*. I really can't figure out the reason behind
that.

My code simply calls the library as follows:

val als = new ALS()
.setIntermediateRDDStorageLevel(storageLevel)
.setBlocks(numTasks)
.setLambda(0.1)
.setRank(50)
.setIterations(10)
.setSeed(42)

val model = als.run(ratings)

model.save(sc, outputPath)

sc.stop()

where
- *ratings* as the input RDD (parallelized with *numTasks* partitions)
contains (uid, iid, rate) rows about 8e6 users, 1e6 items and about (5,6)e9
ratings (700/user avg)
- *numTasks*: currently is 240 * 3 (= numOfCores * 3)
- *storageLevel*: MEMORY_AND_DISK

I did several tries as follows:
- get lower the number of blocks: 1 - numTasks, 2 - 240(numOfCores), 3 -
let it setted by the MLlib implementation
- change the storage level to MEMORY_ONLY

I'd try to varying the spark.shuffle.memoryFraction as well, but I read is
deprecated since 1.6 spark version.

I'm running with a 15 nodes cluster - 16cpus per node, 32GB memory per node
- with the following valuable properties:

spark.executor.memory = 28672m
spark.driver.memory = 28672m
spark.deamon.memory = 28672m
spark.driver.maxResultSize = 0
spark.network.timeout = 3000s

Any help will be appreciated. Thank you.

-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Re: Issue with Spark on 25 nodes cluster

2016-07-13 Thread ANDREA SPINA
Hi,
I solved by increasing the akka timeout time.
All the bests,

2016-06-28 15:04 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:

> Hello everyone,
>
> I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located
> on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I
> set the following params:
>
> spark.master = "spark://"${runtime.hostname}":7077"
>
> # 28 GiB of memory
> spark.executor.memory = "28672m"
> spark.worker.memory = "28672m"
> spark.driver.memory = "2048m"
>
> spark.driver.maxResultSize = "0"
>
> I run some scaling experiments varying the machine set number.
> I can successfully experiments with the whole number of nodes (25) and
> also with (20) nodes. Experiments with environments of 5 nodes and 10 nodes
> relentlessy fails. During the running spark executor begin to collect
> failing jobs from different stages and end with the following trace:
>
> 16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at
> sGradientDescent.scala:229, took 1778.508309 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 212 in stage 14.0 failed 4 times, most recent
> failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19):
> java.io.IOException: Connection from /130.149.21.16:35997 closed
> at
> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Here
> <https://dl.dropboxusercontent.com/u/78598929/spark-hadoop-org.apache.s

Issue with Spark on 25 nodes cluster

2016-06-28 Thread ANDREA SPINA
Hello everyone,

I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located
on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I
set the following params:

spark.master = "spark://"${runtime.hostname}":7077"

# 28 GiB of memory
spark.executor.memory = "28672m"
spark.worker.memory = "28672m"
spark.driver.memory = "2048m"

spark.driver.maxResultSize = "0"

I run some scaling experiments varying the machine set number.
I can successfully experiments with the whole number of nodes (25) and also
with (20) nodes. Experiments with environments of 5 nodes and 10 nodes
relentlessy fails. During the running spark executor begin to collect
failing jobs from different stages and end with the following trace:

16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at
sGradientDescent.scala:229, took 1778.508309 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 212 in stage 14.0 failed 4 times, most recent
failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19):
java.io.IOException: Connection from /130.149.21.16:35997 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Here
<https://dl.dropboxusercontent.com/u/78598929/spark-hadoop-org.apache.spark.deploy.master.Master-1-cloud-11.log>
the Master full Log.
As well, each Worker receive signal SIGTERM: 15

I can't figure out a solution as well.
Thank you, Regards,

Andrea


-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)