Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Alessandro Lulli
Hi All,

I'm experiencing the same issue with Spark 120 (not verified with previous).

Could you please help us on this?

Thanks
Alessandro

On Tue, Nov 18, 2014 at 1:40 AM, mtimper  wrote:

> Hi I'm running a standalone cluster with 8 worker servers.
> I'm developing a streaming app that is adding new lines of text to several
> different RDDs each batch interval. Each line has a well randomized unique
> identifier that I'm trying to use for partitioning, since the data stream
> does contain duplicates lines. I'm doing partitioning with this:
>
> val eventsByKey =  streamRDD.map { event => (getUID(event), event)}
> val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
>.partitionBy(new HashPartitioner(numPartions)).map(e => e._2)
>
> I'm adding to the existing RDD like with this:
>
> val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
> (currentIter,batchIter) =>
> val uniqEvents = ListBuffer[String]()
> val uids = Map[String,Boolean]()
> Array(currentIter, batchIter).foreach { iter =>
>   iter.foreach { event =>
> val uid = getUID(event)
> if (!uids.contains(uid)) {
> uids(uid) = true
> uniqEvents +=event
> }
>   }
> }
> uniqEvents.iterator
> }
>
> val count = mergedRDD.count
>
> The reason I'm doing it this way is that when I was doing:
>
> val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
> val count = mergedRDD.count
>
> It would start taking a long time and a lot of shuffles.
>
> The zipPartitions approach does perform better, though after running an
> hour
> or so I start seeing this
> in the webUI.
>
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png
> >
>
> As you can see most of the data is skewing to just 2 executors, with 1
> getting more than half the Blocks. These become a hotspot and eventually I
> start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
> executors changes, but not the skewing behavior.
>
> Any idea what is going on here?
>
> Thanks,
>
> Mike
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD Partition number

2015-02-20 Thread Alessandro Lulli
Hi All,

Thanks for your answers.
I have one more details to point out.

It is clear now how partition number is defined for HDFS file,

However, if i have my dataset replicated on all the machines in the same
absolute path.
In this case each machine has for instance ext3 filesystem.

If i load the file in a RDD how many partitions are defined in this case
and why?

I found that Spark define a number, say K, of partitions. If i force the
partition to be <=K my parameter is ignored.
If a set a value K*>=K then Spark set K* partitions.

Thanks for your help
Alessandro


On Thu, Feb 19, 2015 at 6:27 PM, Ted Yu  wrote:

> bq. *blocks being 64MB by default in HDFS*
>
>
> *In hadoop 2.1+, default block size has been increased.*
> See https://issues.apache.org/jira/browse/HDFS-4053
>
> Cheers
>
> On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu  wrote:
>
>> What file system are you using ?
>>
>> If you use hdfs, the documentation you cited is pretty clear on how
>> partitions are determined.
>>
>> bq. file X replicated on 4 machines
>>
>> I don't think replication factor plays a role w.r.t. partitions.
>>
>> On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli 
>> wrote:
>>
>>> Hi All,
>>>
>>> Could you please help me understanding how Spark defines the number of
>>> partitions of the RDDs if not specified?
>>>
>>> I found the following in the documentation for file loaded from HDFS:
>>> *The textFile method also takes an optional second argument for
>>> controlling the number of partitions of the file. By default, Spark creates
>>> one partition for each block of the file (blocks being 64MB by default in
>>> HDFS), but you can also ask for a higher number of partitions by passing a
>>> larger value. Note that you cannot have fewer partitions than blocks*
>>>
>>> What is the rule for file loaded from the file systems?
>>> For instance, i have a file X replicated on 4 machines. If i load the
>>> file X in a RDD how many partitions are defined and why?
>>>
>>> Thanks for your help on this
>>> Alessandro
>>>
>>
>>
>


RDD Partition number

2015-02-19 Thread Alessandro Lulli
Hi All,

Could you please help me understanding how Spark defines the number of
partitions of the RDDs if not specified?

I found the following in the documentation for file loaded from HDFS:
*The textFile method also takes an optional second argument for controlling
the number of partitions of the file. By default, Spark creates one
partition for each block of the file (blocks being 64MB by default in
HDFS), but you can also ask for a higher number of partitions by passing a
larger value. Note that you cannot have fewer partitions than blocks*

What is the rule for file loaded from the file systems?
For instance, i have a file X replicated on 4 machines. If i load the file
X in a RDD how many partitions are defined and why?

Thanks for your help on this
Alessandro


Re: Job aborted due to stage failure: TID x failed for unknown reasons

2014-07-22 Thread Alessandro Lulli
Hi All,

Can someone help on this?

I'm encountering exactly the same issue in a very similar scenario with the
same spark version.

Thanks
Alessandro


On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn  wrote:

>  Hi all,
>
> I'm dealing with some strange error messages that I *think* comes down to
> a memory issue, but I'm having a hard time pinning it down and could use
> some guidance from the experts.
>
> I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one
> has 16GB memory, the other 32GB (which is the master). My application
> involves computing pairwise pixel affinities in images, though the images
> I've tested so far only get as big as 1920x1200, and as small as 16x16.
>
> I did have to change a few memory and parallelism settings, otherwise I
> was getting explicit OutOfMemoryExceptions. In spark-default.conf:
>
> spark.executor.memory14g
> spark.default.parallelism32
> spark.akka.frameSize1000
>
> In spark-env.sh:
>
> SPARK_DRIVER_MEMORY=10G
>
> With those settings, however, I get a bunch of WARN statements about "Lost
> TIDs" (no task is successfully completed) in addition to lost Executors,
> which are repeated 4 times until I finally get the following error message
> and crash:
>
> ---
>
> 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
> 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at
> /home/user/Programming/PySpark-Affinities/affinity.py:243
> Traceback (most recent call last):
>   File "/home/user/Programming/PySpark-Affinities/affinity.py", line 243,
> in 
> lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py",
> line 583, in collect
> bytesInJava = self._jrdd.collect().iterator()
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
> line 537, in __call__
>   File
> "/net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0:13 failed 4 times, most recent failure: *TID 32 on host
> master.host.univ.edu  failed for unknown
> reason*
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
> 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor
> 4 from BlockManagerMaster.
> 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in
> removeExecutor
> user@master:~/Programming/PySpark-Affinities$
>
> ---
>
> If I run the really small image instead (16x16), it *appears* to run to
> completion (gives me the output I expect without any exceptions being
> thrown). However, in the stderr logs for the app that was run, it lists the
> state as "KILLED" with the final message a "ERROR
> CoarseGrainedExecutorBackend: Driver Disassociated". If I run any larger
> images, I get the exception I pasted above.
>
> Furthermore, if I just do a spark-submit with master=local[*], aside from
>

Re: Incrementally add/remove vertices in GraphX

2014-03-19 Thread Alessandro Lulli
Hi All,

Thanks for your answer.

Regarding GraphX streaming:

   - Is there an issue (pull request) to follow to keep track of the update?
   - where is possible to find description and details of what will be
   provided?


Thanks for your help and your time to answer my questions
Alessandro



On Wed, Mar 19, 2014 at 2:43 AM, Ankur Dave  wrote:

> As Matei said, there's currently no support for incrementally adding
> vertices or edges to their respective partitions. Doing this efficiently
> would require extensive modifications to GraphX, so for now, the only
> options are to rebuild the indices on every graph modification, or to use
> the subgraph operator if the modification only involves removing vertices
> and edges.
>
> However, Joey and I are working on GraphX streaming, which is currently in
> the very early stages but eventually will enable this.
>
> Ankur 
>
>
> On Tue, Mar 18, 2014 at 3:30 PM, Matei Zaharia wrote:
>
>> I just meant that you call union() before creating the RDDs that you pass
>> to new Graph(). If you call it after it will produce other RDDs.
>>
>> The Graph() constructor actually shuffles and "indexes" the data to make
>> graph operations efficient, so it's not too easy to add elements after. You
>> could access graph.vertices and graph.edges to build new RDDs, and then
>> call Graph() again to make a new graph. I've CCed Joey and Ankur to see if
>> they have further ideas on how to optimize this. It would be cool to
>> support more efficient union and subtracting of graphs once they've been
>> partitioned by GraphX.
>>
>> Matei
>>
>> On Mar 14, 2014, at 8:32 AM, alelulli  wrote:
>>
>> > Hi Matei,
>> >
>> > Could you please clarify why i must call union before creating the
>> graph?
>> >
>> > What's the behavior if i call union / subtract after the creation?
>> > Is the added /removed vertexes been processed?
>> >
>> > For example if i'm implementing an iterative algorithm and at the 5th
>> step i
>> > need to add some vertex / edge, can i call union / subtract on the
>> > VertexRDD, EdgeRDD and Triplets?
>> >
>> > Thanks
>> > Alessandro
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p2695.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>
>


Re: Incrementally add/remove vertices in GraphX

2014-03-17 Thread Alessandro Lulli
Hi All,

Is somebody looking into this?
I think this is correlated with the discussion "Are there any plans to
develop Graphx Streaming?".

Using union / subtract on VertexRDD or EdgeRDD leads on the creation of new
RDD but NOT in the modification of the RDD in the graph.
Is creating a new graph the only way to go to add /remove vertex or edge?

Thanks
Alessandro


On Fri, Mar 14, 2014 at 4:32 PM, alelulli wrote:

> Hi Matei,
>
> Could you please clarify why i must call union before creating the graph?
>
> What's the behavior if i call union / subtract after the creation?
> Is the added /removed vertexes been processed?
>
> For example if i'm implementing an iterative algorithm and at the 5th step
> i
> need to add some vertex / edge, can i call union / subtract on the
> VertexRDD, EdgeRDD and Triplets?
>
> Thanks
> Alessandro
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p2695.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Computation time increasing every super-step

2014-03-11 Thread Alessandro Lulli
Hi All,

I'm facing a performance degradation running an iterative algorithm built
using Spark 0.9 and GraphX.
I'm using org.apache.spark.graphx.Pregel to run the iterative algorithm.

My graph has 2395 vertex 7462 edges.

Every super step the computation time increase significantly. The steps 1-5
are executed in the order of seconds instead steps > 10 are executed in the
order of tens of minutes and always increasing.

In every step each vertex executes always the same actions and sends a
message to all of its neighbor. The graph doesn't change topology during
execution.

I tried also to perform a checkpoint of vertices, edges and triplets at the
end of each step but i'm encountering the same issue.

Could you please help me solve this issue?
Please let me know if i'm missing something or you need additional details.

Thanks
Alessandro