Re: Spark driver getting out of memory

2016-07-24 Thread Raghava Mutharaju
Saurav,

We have the same issue. Our application runs fine on 32 nodes with 4 cores
each and 256 partitions but gives an OOM on the driver when run on 64 nodes
with 512 partitions. Did you get to know the reason behind this behavior or
the relation between number of partitions and driver RAM usage?

Regards,
Raghava.


On Wed, Jul 20, 2016 at 2:08 AM, Saurav Sinha 
wrote:

> Hi,
>
> I have set driver memory 10 GB and job ran with intermediate failure which
> is recovered back by spark.
>
> But I still what to know if no of parts increases git driver ram need to
> be increased and what is ration of no of parts/RAM.
>
> @RK : I am using cache on RDD. Is this reason of high RAM utilization.
>
> Thanks,
> Saurav Sinha
>
> On Tue, Jul 19, 2016 at 10:14 PM, RK Aduri 
> wrote:
>
>> Just want to see if this helps.
>>
>> Are you doing heavy collects and persist that? If that is so, you might
>> want to parallelize that collection by converting to an RDD.
>>
>> Thanks,
>> RK
>>
>> On Tue, Jul 19, 2016 at 12:09 AM, Saurav Sinha 
>> wrote:
>>
>>> Hi Mich,
>>>
>>>1. In what mode are you running the spark standalone, yarn-client,
>>>yarn cluster etc
>>>
>>> Ans: spark standalone
>>>
>>>1. You have 4 nodes with each executor having 10G. How many actual
>>>executors do you see in UI (Port 4040 by default)
>>>
>>> Ans: There are 4 executor on which am using 8 cores
>>> (--total-executor-core 32)
>>>
>>>1. What is master memory? Are you referring to diver memory? May be
>>>I am misunderstanding this
>>>
>>> Ans: Driver memory is set as --drive-memory 5g
>>>
>>>1. The only real correlation I see with the driver memory is when
>>>you are running in local mode where worker lives within JVM process that
>>>you start with spark-shell etc. In that case driver memory matters.
>>>However, it appears that you are running in another mode with 4 nodes?
>>>
>>> Ans: I am running my job as spark-submit and on my worker(executor) node
>>> there is no OOM issue ,it only happening on driver app.
>>>
>>> Thanks,
>>> Saurav Sinha
>>>
>>> On Tue, Jul 19, 2016 at 2:42 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 can you please clarify:


1. In what mode are you running the spark standalone, yarn-client,
yarn cluster etc
2. You have 4 nodes with each executor having 10G. How many actual
executors do you see in UI (Port 4040 by default)
3. What is master memory? Are you referring to diver memory? May be
I am misunderstanding this
4. The only real correlation I see with the driver memory is when
you are running in local mode where worker lives within JVM process that
you start with spark-shell etc. In that case driver memory matters.
However, it appears that you are running in another mode with 4 nodes?

 Can you get a snapshot of your environment tab in UI and send the
 output please?

 HTH


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 18 July 2016 at 11:50, Saurav Sinha  wrote:

> I have set --drive-memory 5g. I need to understand that as no of
> partition increase drive-memory need to be increased. What will be
> best ration of No of partition/drive-memory.
>
> On Mon, Jul 18, 2016 at 4:07 PM, Zhiliang Zhu 
> wrote:
>
>> try to set --drive-memory xg , x would be as large as can be set .
>>
>>
>> On Monday, July 18, 2016 6:31 PM, Saurav Sinha <
>> sauravsinh...@gmail.com> wrote:
>>
>>
>> Hi,
>>
>> I am running spark job.
>>
>> Master memory - 5G
>> executor memort 10G(running on 4 node)
>>
>> My job is getting killed as no of partition increase to 20K.
>>
>> 16/07/18 14:53:13 INFO DAGScheduler: Got job 17 (foreachPartition at
>> WriteToKafka.java:45) with 13524 output partitions (allowLocal=false)
>> 16/07/18 14:53:13 INFO DAGScheduler: Final stage: ResultStage
>> 640(foreachPartition at WriteToKafka.java:45)
>> 16/07/18 14:53:13 INFO DAGScheduler: Parents of final stage:
>> List(ShuffleMapStage 518, ShuffleMapStage 639)
>> 16/07/18 14:53:23 INFO DAGScheduler: Missing parents: List()
>> 16/07/18 14:53:23 

Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Thank you. Sure, if I find something I will post it.

Regards,
Raghava.


On Wed, Jun 22, 2016 at 7:43 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I believe it would be task, partitions, task status etc information. I do
> not know exact of those things but I had OOM on driver with 512MB and
> increasing it did help. Someone else might be able to answer about exact
> memory usage of driver better.
>
> You also seem to use broadcast means sending something from dirver jvm.
> You can try taking memory dump when your driver memory is about full or set
> jvm args to take it automatically on OutOfMemory error. Analyze it and
> share your finding :)
>
>
>
> On Wed, Jun 22, 2016 at 4:33 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Ok. Would be able to shed more light on what exact meta data it manages
>> and what is the relationship with more number of partitions/nodes?
>>
>> There is one executor running on each node -- so there are 64 executors
>> in total. Each executor, including the driver are give 12GB and this is the
>> maximum available limit. So the other options are
>>
>> 1) Separate the driver from master, i.e., run them on two separate nodes
>> 2) Increase the RAM capacity on the driver/master node.
>>
>> Regards,
>> Raghava.
>>
>>
>> On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Yes driver keeps fair amount of meta data to manage scheduling across
>>> all your executors. I assume with 64 nodes you have more executors as well.
>>> Simple way to test is to increase driver memory.
>>>
>>> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
>>>> It is an iterative algorithm which uses map, mapPartitions, join,
>>>> union, filter, broadcast and count. The goal is to compute a set of tuples
>>>> and in each iteration few tuples are added to it. Outline is given below
>>>>
>>>> 1) Start with initial set of tuples, T
>>>> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
>>>> deltaT
>>>> 3) Stop when current T size (count) is same as previous T size, i.e.,
>>>> deltaT is 0.
>>>>
>>>> Do you think something happens on the driver due to the application
>>>> logic and when the partitions are increased?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>>
>>>> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal <sonalgoy...@gmail.com>
>>>> wrote:
>>>>
>>>>> What does your application do?
>>>>>
>>>>> Best Regards,
>>>>> Sonal
>>>>> Founder, Nube Technologies <http://www.nubetech.co>
>>>>> Reifier at Strata Hadoop World
>>>>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>>>>> Reifier at Spark Summit 2015
>>>>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>>>>
>>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>>>>> m.vijayaragh...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> We have a Spark cluster where driver and master are running on the
>>>>>> same node. We are using Spark Standalone cluster manager. If the number 
>>>>>> of
>>>>>> nodes (and the partitions) are increased, the same dataset that used to 
>>>>>> run
>>>>>> to completion on lesser number of nodes is now giving an out of memory on
>>>>>> the driver.
>>>>>>
>>>>>> For example, a dataset that runs on 32 nodes with number of
>>>>>> partitions set to 256 completes whereas the same dataset when run on 64
>>>>>> nodes with number of partitions as 512 gives an OOM on the driver side.
>>>>>>
>>>>>> From what I read in the Spark documentation and other articles,
>>>>>> following are the responsibilities of the driver/master.
>>>>>>
>>>>>> 1) create spark context
>>>>>> 2) build DAG of operations
>>>>>> 3) schedule tasks
>>>>>>
>>>>>> I am guessing that 1) and 2) should

Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Ok. Would be able to shed more light on what exact meta data it manages and
what is the relationship with more number of partitions/nodes?

There is one executor running on each node -- so there are 64 executors in
total. Each executor, including the driver are give 12GB and this is the
maximum available limit. So the other options are

1) Separate the driver from master, i.e., run them on two separate nodes
2) Increase the RAM capacity on the driver/master node.

Regards,
Raghava.


On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Yes driver keeps fair amount of meta data to manage scheduling across all
> your executors. I assume with 64 nodes you have more executors as well.
> Simple way to test is to increase driver memory.
>
> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> It is an iterative algorithm which uses map, mapPartitions, join, union,
>> filter, broadcast and count. The goal is to compute a set of tuples and in
>> each iteration few tuples are added to it. Outline is given below
>>
>> 1) Start with initial set of tuples, T
>> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
>> deltaT
>> 3) Stop when current T size (count) is same as previous T size, i.e.,
>> deltaT is 0.
>>
>> Do you think something happens on the driver due to the application logic
>> and when the partitions are increased?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal <sonalgoy...@gmail.com>
>> wrote:
>>
>>> What does your application do?
>>>
>>> Best Regards,
>>> Sonal
>>> Founder, Nube Technologies <http://www.nubetech.co>
>>> Reifier at Strata Hadoop World
>>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>>> Reifier at Spark Summit 2015
>>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>>
>>> <http://in.linkedin.com/in/sonalgoyal>
>>>
>>>
>>>
>>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> We have a Spark cluster where driver and master are running on the same
>>>> node. We are using Spark Standalone cluster manager. If the number of nodes
>>>> (and the partitions) are increased, the same dataset that used to run to
>>>> completion on lesser number of nodes is now giving an out of memory on the
>>>> driver.
>>>>
>>>> For example, a dataset that runs on 32 nodes with number of partitions
>>>> set to 256 completes whereas the same dataset when run on 64 nodes with
>>>> number of partitions as 512 gives an OOM on the driver side.
>>>>
>>>> From what I read in the Spark documentation and other articles,
>>>> following are the responsibilities of the driver/master.
>>>>
>>>> 1) create spark context
>>>> 2) build DAG of operations
>>>> 3) schedule tasks
>>>>
>>>> I am guessing that 1) and 2) should not change w.r.t number of
>>>> nodes/partitions. So is it that since the driver has to keep track of lot
>>>> more tasks, that it gives an OOM?
>>>>
>>>> What could be the possible reasons behind the driver-side OOM when the
>>>> number of partitions are increased?
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>




-- 
Regards,
Raghava
http://raghavam.github.io


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
It is an iterative algorithm which uses map, mapPartitions, join, union,
filter, broadcast and count. The goal is to compute a set of tuples and in
each iteration few tuples are added to it. Outline is given below

1) Start with initial set of tuples, T
2) In each iteration compute deltaT, and add them to T, i.e., T = T + deltaT
3) Stop when current T size (count) is same as previous T size, i.e.,
deltaT is 0.

Do you think something happens on the driver due to the application logic
and when the partitions are increased?

Regards,
Raghava.


On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> What does your application do?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co>
> Reifier at Strata Hadoop World
> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
> Reifier at Spark Summit 2015
> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> We have a Spark cluster where driver and master are running on the same
>> node. We are using Spark Standalone cluster manager. If the number of nodes
>> (and the partitions) are increased, the same dataset that used to run to
>> completion on lesser number of nodes is now giving an out of memory on the
>> driver.
>>
>> For example, a dataset that runs on 32 nodes with number of partitions
>> set to 256 completes whereas the same dataset when run on 64 nodes with
>> number of partitions as 512 gives an OOM on the driver side.
>>
>> From what I read in the Spark documentation and other articles, following
>> are the responsibilities of the driver/master.
>>
>> 1) create spark context
>> 2) build DAG of operations
>> 3) schedule tasks
>>
>> I am guessing that 1) and 2) should not change w.r.t number of
>> nodes/partitions. So is it that since the driver has to keep track of lot
>> more tasks, that it gives an OOM?
>>
>> What could be the possible reasons behind the driver-side OOM when the
>> number of partitions are increased?
>>
>> Regards,
>> Raghava.
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


OOM on the driver after increasing partitions

2016-06-22 Thread Raghava Mutharaju
Hello All,

We have a Spark cluster where driver and master are running on the same
node. We are using Spark Standalone cluster manager. If the number of nodes
(and the partitions) are increased, the same dataset that used to run to
completion on lesser number of nodes is now giving an out of memory on the
driver.

For example, a dataset that runs on 32 nodes with number of partitions set
to 256 completes whereas the same dataset when run on 64 nodes with number
of partitions as 512 gives an OOM on the driver side.

>From what I read in the Spark documentation and other articles, following
are the responsibilities of the driver/master.

1) create spark context
2) build DAG of operations
3) schedule tasks

I am guessing that 1) and 2) should not change w.r.t number of
nodes/partitions. So is it that since the driver has to keep track of lot
more tasks, that it gives an OOM?

What could be the possible reasons behind the driver-side OOM when the
number of partitions are increased?

Regards,
Raghava.


Spark 2.0.0-snapshot: IllegalArgumentException: requirement failed: chunks must be non-empty

2016-05-13 Thread Raghava Mutharaju
I am able to run my application after I compiled Spark source in the
following way

./dev/change-scala-version.sh 2.11

./dev/make-distribution.sh --name spark-2.0.0-snapshot-bin-hadoop2.6 --tgz
-Phadoop-2.6 -DskipTests

But while the application is running I get the following exception, which I
was not getting with Spark 1.6.1. Any idea why this might be happening?

java.lang.IllegalArgumentException: requirement failed: chunks must be
non-empty

at scala.Predef$.require(Predef.scala:224)

at
org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:41)

at
org.apache.spark.util.io.ChunkedByteBuffer.(ChunkedByteBuffer.scala:52)

at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:580)

at
org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:514)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:601)

at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:653)

at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:329)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:280)

at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:100)

at
org.apache.spark.rdd.PartitionerAwareUnionRDD$$anonfun$compute$1.apply(PartitionerAwareUnionRDD.scala:99)

at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)

at scala.collection.mutable.SetBuilder.$plus$plus$eq(SetBuilder.scala:20)

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)

at scala.collection.AbstractIterator.to(Iterator.scala:1336)

at scala.collection.TraversableOnce$class.toSet(TraversableOnce.scala:304)

at scala.collection.AbstractIterator.toSet(Iterator.scala:1336)

at
org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:392)

at
org.daselab.sparkel.SparkELHDFSTestCopy$$anonfun$45.apply(SparkELHDFSTestCopy.scala:391)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:756)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

On Fri, May 13, 2016 at 6:33 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Thank you for the response.
>
> I used the following command to build from source
>
> build/mvn -Dhadoop.version=2.6.4 -Phadoop-2.6 -DskipTests clean package
>
> Would this put in the required jars in .ivy2 during the build process? If
> so, how can I make the spark distribution runnable, so that I can use it on
> other machines as well (make-distribution.sh no longer exists in Spark root
> folder)?
>
> For compiling my application, I put in the following lines in the build.sbt
>
> packAutoSettings
> val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
> val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"
>
> lazy val root = (project in file(".")).
>   settings(
> name := "sparkel",
> version := "0.1.0",
> scalaVersion := "2.11.8",
> libraryDependencies += spark,
> libraryDependencies += sparksql
>   )
>
>
> Regards,
> Raghava.
>
>
> On Fri, May 13, 2016 at 12:23 AM, Luciano Resende <luckbr1...@gmail.com>
> wrote:
>
>> Spark has moved to build using Scala 2.11 by default in master/trunk.
>>
>> As for the 2.0.0-SNAPSHOT, it is actually the version of master/trunk and
>> you might be missing some modules/profiles for your build. What command did
&g

Re: sbt for Spark build with Scala 2.11

2016-05-13 Thread Raghava Mutharaju
Thank you for the response.

I used the following command to build from source

build/mvn -Dhadoop.version=2.6.4 -Phadoop-2.6 -DskipTests clean package

Would this put in the required jars in .ivy2 during the build process? If
so, how can I make the spark distribution runnable, so that I can use it on
other machines as well (make-distribution.sh no longer exists in Spark root
folder)?

For compiling my application, I put in the following lines in the build.sbt

packAutoSettings
val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"

lazy val root = (project in file(".")).
  settings(
name := "sparkel",
version := "0.1.0",
scalaVersion := "2.11.8",
libraryDependencies += spark,
libraryDependencies += sparksql
  )


Regards,
Raghava.


On Fri, May 13, 2016 at 12:23 AM, Luciano Resende <luckbr1...@gmail.com>
wrote:

> Spark has moved to build using Scala 2.11 by default in master/trunk.
>
> As for the 2.0.0-SNAPSHOT, it is actually the version of master/trunk and
> you might be missing some modules/profiles for your build. What command did
> you use to build ?
>
> On Thu, May 12, 2016 at 9:01 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> I built Spark from the source code available at
>> https://github.com/apache/spark/. Although I haven't specified the
>> "-Dscala-2.11" option (to build with Scala 2.11), from the build messages I
>> see that it ended up using Scala 2.11. Now, for my application sbt, what
>> should be the spark version? I tried the following
>>
>> val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
>> val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"
>>
>> and scalaVersion := "2.11.8"
>>
>> But this setting of spark version gives sbt error
>>
>> unresolved dependency: org.apache.spark#spark-core_2.11;2.0.0-SNAPSHOT
>>
>> I guess this is because the repository doesn't contain 2.0.0-SNAPSHOT.
>> Does this mean, the only option is to put all the required jars in the lib
>> folder (unmanaged dependencies)?
>>
>> Regards,
>> Raghava.
>>
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>



-- 
Regards,
Raghava
http://raghavam.github.io


sbt for Spark build with Scala 2.11

2016-05-12 Thread Raghava Mutharaju
Hello All,

I built Spark from the source code available at
https://github.com/apache/spark/. Although I haven't specified the
"-Dscala-2.11" option (to build with Scala 2.11), from the build messages I
see that it ended up using Scala 2.11. Now, for my application sbt, what
should be the spark version? I tried the following

val spark = "org.apache.spark" %% "spark-core" % "2.0.0-SNAPSHOT"
val sparksql = "org.apache.spark" % "spark-sql_2.11" % "2.0.0-SNAPSHOT"

and scalaVersion := "2.11.8"

But this setting of spark version gives sbt error

unresolved dependency: org.apache.spark#spark-core_2.11;2.0.0-SNAPSHOT

I guess this is because the repository doesn't contain 2.0.0-SNAPSHOT. Does
this mean, the only option is to put all the required jars in the lib
folder (unmanaged dependencies)?

Regards,
Raghava.


Re: partitioner aware subtract

2016-05-10 Thread Raghava Mutharaju
Thank you for the response.

This does not work on the test case that I mentioned in the previous email.

val data1 = Seq((1 -> 2), (1 -> 5), (2 -> 3), (3 -> 20), (3 -> 16))
val data2 = Seq((1 -> 2), (3 -> 30), (3 -> 16), (5 -> 12))
val rdd1 = sc.parallelize(data1, 8)
val rdd2 = sc.parallelize(data2, 8)
val diff = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
  leftItr.filter(p => !rightItr.contains(p))
}
diff.collect().foreach(println)
(1,5)
(2,3)
(3,20)
(3,16)

(3, 16) shouldn't be in the diff. I guess this shows up because rdd2 is
smaller than rdd1 and rdd2's iterator (rightItr) would have completed
before leftIter?

Anyway, we did the subtract in the following way:

using mapPartitions, group the values by key as a set in rdd2. Then do a
left outer join of rdd1 with rdd2 and filter it. This preserves
partitioning and also takes into account that both RDDs are already hash
partitioned.

Regards,
Raghava.


On Tue, May 10, 2016 at 11:44 AM, Rishi Mishra <rmis...@snappydata.io>
wrote:

> As you have same partitioner and number of partitions probably you can use
> zipPartition and provide a user defined function to substract .
>
> A very primitive  example being.
>
> val data1 = Seq(1->1,2->2,3->3,4->4,5->5,6->6,7->7)
> val data2 = Seq(1->1,2->2,3->3,4->4,5->5,6->6)
> val rdd1 = sc.parallelize(data1, 2)
> val rdd2 = sc.parallelize(data2, 2)
> val sum = rdd1.zipPartitions(rdd2){ (leftItr, rightItr) =>
>   leftItr.filter(p => !rightItr.contains(p))
> }
> sum.foreach(println)
>
>
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Mon, May 9, 2016 at 7:35 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> We tried that but couldn't figure out a way to efficiently filter it.
>> Lets take two RDDs.
>>
>> rdd1:
>>
>> (1,2)
>> (1,5)
>> (2,3)
>> (3,20)
>> (3,16)
>>
>> rdd2:
>>
>> (1,2)
>> (3,30)
>> (3,16)
>> (5,12)
>>
>> rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):
>>
>> (1,(2,Some(2)))
>> (1,(5,Some(2)))
>> (2,(3,None))
>> (3,(20,Some(30)))
>> (3,(20,Some(16)))
>> (3,(16,Some(30)))
>> (3,(16,Some(16)))
>>
>> case (x, (y, z)) => Apart from allowing z == None and filtering on y ==
>> z, we also should filter out (3, (16, Some(30))). How can we do that
>> efficiently without resorting to broadcast of any elements of rdd2?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, May 9, 2016 at 6:27 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> How about outer join?
>>> On 9 May 2016 13:18, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
>>> wrote:
>>>
>>>> Hello All,
>>>>
>>>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>>>> (number of partitions are same for both the RDDs). We would like to
>>>> subtract rdd2 from rdd1.
>>>>
>>>> The subtract code at
>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>>>> seems to group the elements of both the RDDs using (x, null) where x is the
>>>> element of the RDD and partition them. Then it makes use of
>>>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>>>> case, is both key and value combined). In our case, both the RDDs are
>>>> already hash partitioned on the key of x. Can we take advantage of this by
>>>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>>>> mapPartitions() for this?
>>>>
>>>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>>>> be memory consuming and inefficient. We tried to do a local set difference
>>>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>>>> use destroy() on the broadcasted value, but it does not help.
>>>>
>>>> The current subtract method is slow for us. rdd1 and rdd2 are around
>>>> 700MB each and the subtract takes around 14 seconds.
>>>>
>>>> Any ideas on this issue is highly appreciated.
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: partitioner aware subtract

2016-05-09 Thread Raghava Mutharaju
We tried that but couldn't figure out a way to efficiently filter it. Lets
take two RDDs.

rdd1:

(1,2)
(1,5)
(2,3)
(3,20)
(3,16)

rdd2:

(1,2)
(3,30)
(3,16)
(5,12)

rdd1.leftOuterJoin(rdd2) and get rdd1.subtract(rdd2):

(1,(2,Some(2)))
(1,(5,Some(2)))
(2,(3,None))
(3,(20,Some(30)))
(3,(20,Some(16)))
(3,(16,Some(30)))
(3,(16,Some(16)))

case (x, (y, z)) => Apart from allowing z == None and filtering on y == z,
we also should filter out (3, (16, Some(30))). How can we do that
efficiently without resorting to broadcast of any elements of rdd2?

Regards,
Raghava.


On Mon, May 9, 2016 at 6:27 AM, ayan guha <guha.a...@gmail.com> wrote:

> How about outer join?
> On 9 May 2016 13:18, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Hello All,
>>
>> We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key
>> (number of partitions are same for both the RDDs). We would like to
>> subtract rdd2 from rdd1.
>>
>> The subtract code at
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
>> seems to group the elements of both the RDDs using (x, null) where x is the
>> element of the RDD and partition them. Then it makes use of
>> subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
>> case, is both key and value combined). In our case, both the RDDs are
>> already hash partitioned on the key of x. Can we take advantage of this by
>> having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
>> mapPartitions() for this?
>>
>> We tried to broadcast rdd2 and use mapPartitions. But this turns out to
>> be memory consuming and inefficient. We tried to do a local set difference
>> between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
>> use destroy() on the broadcasted value, but it does not help.
>>
>> The current subtract method is slow for us. rdd1 and rdd2 are around
>> 700MB each and the subtract takes around 14 seconds.
>>
>> Any ideas on this issue is highly appreciated.
>>
>> Regards,
>> Raghava.
>>
>


-- 
Regards,
Raghava
http://raghavam.github.io


partitioner aware subtract

2016-05-08 Thread Raghava Mutharaju
Hello All,

We have two PairRDDs (rdd1, rdd2) which are hash partitioned on key (number
of partitions are same for both the RDDs). We would like to subtract rdd2
from rdd1.

The subtract code at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
seems to group the elements of both the RDDs using (x, null) where x is the
element of the RDD and partition them. Then it makes use of
subtractByKey(). This way, RDDs have to be repartitioned on x (which in our
case, is both key and value combined). In our case, both the RDDs are
already hash partitioned on the key of x. Can we take advantage of this by
having a PairRDD/HashPartitioner-aware subtract? Is there a way to use
mapPartitions() for this?

We tried to broadcast rdd2 and use mapPartitions. But this turns out to be
memory consuming and inefficient. We tried to do a local set difference
between rdd1 and the broadcasted rdd2 (in mapPartitions of rdd1). We did
use destroy() on the broadcasted value, but it does not help.

The current subtract method is slow for us. rdd1 and rdd2 are around 700MB
each and the subtract takes around 14 seconds.

Any ideas on this issue is highly appreciated.

Regards,
Raghava.


Re: executor delay in Spark

2016-04-28 Thread Raghava Mutharaju
Hello Mike,

No problem, logs are useful to us anyway. Thank you for all the pointers.
We started off with examining only a single RDD but later on added a few
more. The persist count and unpersist count sequence is the dummy stage
that you suggested us to use to avoid the initial scheduler delay.

Our issue is very similar to the one you posted:
http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-td16988.html.
We tried with spark.shuffle.reduceLocality.enabled=false and it helps in
certain cases. Were you able to fix that issue? We use Spark 1.6.0

We noticed the following
1) persisting an RDD seems to lead to unbalanced distribution of partitions
across the executors.
2) If one RDD has an all-nothing skew then rest of the RDDs that depend on
it also get all-nothing skew.

Regards,
Raghava.


On Wed, Apr 27, 2016 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote:

> Hi Raghava,
>
> I'm terribly sorry about the end of my last email; that garbled
> sentence was garbled because it wasn't meant to exist; I wrote it on
> my phone, realized I wouldn't realistically have time to look into
> another set of logs deeply enough, and then mistook myself for having
> deleted it. Again, I'm very sorry for my error here.
>
> I did peek at your code, though, and think you could try the following:
> 0. The actions in your main method are many, and it will be hard to
> isolate a problem; I would recommend only examing *one* RDD at first,
> rather than six.
> 1. There is a lot of repetition for reading RDDs from textfiles
> sequentially; if you put those lines into two methods depending on RDD
> type, you will at least have one entry point to work with once you
> make a simplified test program.
> 2. In one part you persist, count, immediately unpersist, and then
> count again an RDD.. I'm not acquainted with this idiom, and I don't
> understand what that is to achieve. It strikes me suspect for
> triggering unusual garbage collection, which would, I think, only
> complicate your trace debugging.
>
> I've attached a python script that dumps relevant info from the Spark
> JSON logs into a CSV for easier analysis in you language of choice;
> hopefully it can aid in finer grained debugging (the headers of the
> fields it prints are listed in one of the functions).
>
> Mike
>
> On 4/25/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> > Mike,
> >
> > We ran our program with 16, 32 and 64 partitions. The behavior was same
> as
> > before with 8 partitions. It was mixed -- for some RDDs we see an
> > all-nothing skew, but for others we see them getting split across the 2
> > worker nodes. In some cases, they start with even split and in later
> > iterations it goes to all-nothing split. Please find the logs attached.
> >
> > our program source code:
> >
> https://github.com/raghavam/sparkel/blob/275ecbb901a58592d8a70a8568dd95c839d46ecc/src/main/scala/org/daselab/sparkel/SparkELDAGAnalysis.scala
> >
> > We put in persist() statements for different RDDs just to check their
> skew.
> >
> > @Jeff, setting minRegisteredResourcesRatio did not help. Behavior was
> same
> > as before.
> >
> > Thank you for your time.
> >
> > Regards,
> > Raghava.
> >
> >
> > On Sun, Apr 24, 2016 at 7:17 PM, Mike Hynes <91m...@gmail.com> wrote:
> >
> >> Could you change numPartitions to {16, 32, 64} and run your program for
> >> each to see how many partitions are allocated to each worker? Let's see
> >> if
> >> you experience an all-nothing imbalance that way; if so, my guess is
> that
> >> something else is odd in your program logic or spark runtime
> environment,
> >> but if not and your executors all receive at least *some* partitions,
> >> then
> >> I still wouldn't rule out effects of scheduling delay. It's a simple
> >> test,
> >> but it could give some insight.
> >>
> >> Mike
> >>
> >> his could still be a  scheduling  If only one has *all* partitions,  and
> >> email me the log file? (If it's 10+ MB, just the first few thousand
> lines
> >> are fine).
> >> On Apr 25, 2016 7:07 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com
> >
> >> wrote:
> >>
> >>> Mike, All,
> >>>
> >>> It turns out that the second time we encountered the uneven-partition
> >>> issue is not due to spark-submit. It was resolved with the change in
> >>> placement of count().
> >>>
> >>> Case-1:
> >>>
> >>> val nu

Re: executor delay in Spark

2016-04-24 Thread Raghava Mutharaju
Mike, All,

It turns out that the second time we encountered the uneven-partition issue
is not due to spark-submit. It was resolved with the change in placement of
count().

Case-1:

val numPartitions = 8
// read uAxioms from HDFS, use hash partitioner on it and persist it
// read type1Axioms from HDFS, use hash partitioner on it and persist it
currDeltaURule1 = type1Axioms.join(uAxioms)
 .values
 .distinct(numPartitions)
 .partitionBy(hashPartitioner)
currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
   .count()



currDeltaURule1 RDD results in all the data on one node (there are 2 worker
nodes). If we move count(), the uneven partition issue is resolved.

Case-2:

currDeltaURule1 = currDeltaURule1.setName("deltaURule1_" + loopCounter)

 .persist(StorageLevel.MEMORY_AND_DISK)
   



<rdd.count()> -- this rdd depends on currDeltaURule1 and it gets executed.
This resolved the uneven partitioning issue.

I don't see why the moving of an action to a later part in the code would
affect the partitioning. Are there other factors at play here that affect
the partitioning?

(Inconsistent) uneven partitioning leads to one machine getting over
burdened (memory and number of tasks). We see a clear improvement in
runtime when the partitioning is even (happens when count is moved).

Any pointers in figuring out this issue is much appreciated.

Regards,
Raghava.




On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>> 1. Examine the starting times of the tasks alongside their
>>> executor
>>> 2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>> 3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master " to run
>>> the
>>> > scal

Re: executor delay in Spark

2016-04-22 Thread Raghava Mutharaju
Thank you. For now we plan to use spark-shell to submit jobs.

Regards,
Raghava.


On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" <m.vijayaragh...@gmail.com>
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>> 1. Examine the starting times of the tasks alongside their
>>> executor
>>> 2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>> 3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master " to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Progr

executor delay in Spark

2016-04-22 Thread Raghava Mutharaju
Mike,

It turns out the executor delay, as you mentioned, is the cause. After we
introduced a dummy stage, partitioning was working fine. Does this delay
happen during later stages as well? We noticed the same behavior
(partitioning happens on spark-shell but not through spark-submit) at a
later stage also.

Apart from introducing a dummy stage or running it from spark-shell, is
there any other option to fix this?

Regards,
Raghava.


On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:

> When submitting a job with spark-submit, I've observed delays (up to
> 1--2 seconds) for the executors to respond to the driver in order to
> receive tasks in the first stage. The delay does not persist once the
> executors have been synchronized.
>
> When the tasks are very short, as may be your case (relatively small
> data and a simple map task like you have described), the 8 tasks in
> your stage may be allocated to only 1 executor in 2 waves of 4, since
> the second executor won't have responded to the master before the
> first 4 tasks on the first executor have completed.
>
> To see if this is the cause in your particular case, you could try the
> following to confirm:
> 1. Examine the starting times of the tasks alongside their executor
> 2. Make a "dummy" stage execute before your real stages to
> synchronize the executors by creating and materializing any random RDD
> 3. Make the tasks longer, i.e. with some silly computational work.
>
> Mike
>
>
> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> > Yes its the same data.
> >
> > 1) The number of partitions are the same (8, which is an argument to the
> > HashPartitioner). In the first case, these partitions are spread across
> > both the worker nodes. In the second case, all the partitions are on the
> > same node.
> > 2) What resources would be of interest here? Scala shell takes the
> default
> > parameters since we use "bin/spark-shell --master " to run
> the
> > scala-shell. For the scala program, we do set some configuration options
> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> > serializer.
> >
> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> > RAM.1 executor runs on each worker node. Following configuration options
> > are set for the scala program -- perhaps we should move it to the spark
> > config file.
> >
> > Driver memory and executor memory are set to 12GB
> > parallelism is set to 8
> > Kryo serializer is used
> > Number of retainedJobs and retainedStages has been increased to check
> them
> > in the UI.
> >
> > What information regarding Spark Context would be of interest here?
> >
> > Regards,
> > Raghava.
> >
> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:
> >
> >> If the data file is same then it should have similar distribution of
> >> keys.
> >> Few queries-
> >>
> >> 1. Did you compare the number of partitions in both the cases?
> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> Program being submitted?
> >>
> >> Also, can you please share the details of Spark Context, Environment and
> >> Executors when you run via Scala program?
> >>
> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> m.vijayaragh...@gmail.com> wrote:
> >>
> >>> Hello All,
> >>>
> >>> We are using HashPartitioner in the following way on a 3 node cluster
> (1
> >>> master and 2 worker nodes).
> >>>
> >>> val u =
> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> >>> x.toInt) } }).partitionBy(new
> HashPartitioner(8)).setName("u").persist()
> >>>
> >>> u.count()
> >>>
> >>> If we run this from the spark shell, the data (52 MB) is split across
> >>> the
> >>> two worker nodes. But if we put this in a scala program and run it,
> then
> >>> all the data goes to only one node. We have run it multiple times, but
> >>> this
> >>> behavior does not change. This seems strange.
> >>>
> >>> Is there some problem with the way we use HashPartitioner?
> >>>
> >>> Thanks in advance.
> >>>
> >>> Regards,
> >>> Raghava.
> >>>
> >>
> >>
> >
> >
> > --
> > Regards,
> > Raghava
> > http://raghavam.github.io
> >
>
>
> --
> Thanks,
> Mike
>



-- 
Regards,
Raghava
http://raghavam.github.io


Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Raghava Mutharaju
No. We specify it as a configuration option to the spark-submit. Does that
make a difference?

Regards,
Raghava.


On Mon, Apr 18, 2016 at 9:56 AM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> Are you specifying your spark master in the scala program?
>
> Best Regards,
> Sonal
> Founder, Nube Technologies <http://www.nubetech.co>
> Reifier at Strata Hadoop World
> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
> Reifier at Spark Summit 2015
> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Mon, Apr 18, 2016 at 6:26 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Mike,
>>
>> We tried that. This map task is actually part of a larger set of
>> operations. I pointed out this map task since it involves partitionBy() and
>> we always use partitionBy() whenever partition-unaware shuffle operations
>> are performed (such as distinct). We in fact do not notice a change in the
>> distribution after several unrelated stages are executed and a significant
>> time has passed (nearly 10-15 minutes).
>>
>> I agree. We are not looking for partitions to go to specific nodes and
>> nor do we expect a uniform distribution of keys across the cluster. There
>> will be a skew. But it cannot be that all the data is on one node and
>> nothing on the other and no, the keys are not the same. They vary from 1 to
>> around 55000 (integers). What makes this strange is that it seems to work
>> fine on the spark shell (REPL).
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> A HashPartitioner will indeed partition based on the key, but you
>>> cannot know on *which* node that key will appear. Again, the RDD
>>> partitions will not necessarily be distributed evenly across your
>>> nodes because of the greedy scheduling of the first wave of tasks,
>>> particularly if those tasks have durations less than the initial
>>> executor delay. I recommend you look at your logs to verify if this is
>>> happening to you.
>>>
>>> Mike
>>>
>>> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote:
>>> > Good point Mike +1
>>> >
>>> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
>>> >
>>> >> When submitting a job with spark-submit, I've observed delays (up to
>>> >> 1--2 seconds) for the executors to respond to the driver in order to
>>> >> receive tasks in the first stage. The delay does not persist once the
>>> >> executors have been synchronized.
>>> >>
>>> >> When the tasks are very short, as may be your case (relatively small
>>> >> data and a simple map task like you have described), the 8 tasks in
>>> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> >> the second executor won't have responded to the master before the
>>> >> first 4 tasks on the first executor have completed.
>>> >>
>>> >> To see if this is the cause in your particular case, you could try the
>>> >> following to confirm:
>>> >> 1. Examine the starting times of the tasks alongside their
>>> >> executor
>>> >> 2. Make a "dummy" stage execute before your real stages to
>>> >> synchronize the executors by creating and materializing any random RDD
>>> >> 3. Make the tasks longer, i.e. with some silly computational
>>> >> work.
>>> >>
>>> >> Mike
>>> >>
>>> >>
>>> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
>>> >> > Yes its the same data.
>>> >> >
>>> >> > 1) The number of partitions are the same (8, which is an argument to
>>> >> > the
>>> >> > HashPartitioner). In the first case, these partitions are spread
>>> across
>>> >> > both the worker nodes. In the second case, all the partitions are on
>>> >> > the
>>> >> > same node.
>>> >> > 2) What resources would be of interest here? Scala shell takes the
>>> >> default
>>> >> > parameters since we use "bin/spark-shell --master " to
>>> run
>>> >> the
>>> >> > scala

Re: strange HashPartitioner behavior in Spark

2016-04-18 Thread Raghava Mutharaju
Mike,

We tried that. This map task is actually part of a larger set of
operations. I pointed out this map task since it involves partitionBy() and
we always use partitionBy() whenever partition-unaware shuffle operations
are performed (such as distinct). We in fact do not notice a change in the
distribution after several unrelated stages are executed and a significant
time has passed (nearly 10-15 minutes).

I agree. We are not looking for partitions to go to specific nodes and nor
do we expect a uniform distribution of keys across the cluster. There will
be a skew. But it cannot be that all the data is on one node and nothing on
the other and no, the keys are not the same. They vary from 1 to around
55000 (integers). What makes this strange is that it seems to work fine on
the spark shell (REPL).

Regards,
Raghava.


On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote:

> A HashPartitioner will indeed partition based on the key, but you
> cannot know on *which* node that key will appear. Again, the RDD
> partitions will not necessarily be distributed evenly across your
> nodes because of the greedy scheduling of the first wave of tasks,
> particularly if those tasks have durations less than the initial
> executor delay. I recommend you look at your logs to verify if this is
> happening to you.
>
> Mike
>
> On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote:
> > Good point Mike +1
> >
> > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:
> >
> >> When submitting a job with spark-submit, I've observed delays (up to
> >> 1--2 seconds) for the executors to respond to the driver in order to
> >> receive tasks in the first stage. The delay does not persist once the
> >> executors have been synchronized.
> >>
> >> When the tasks are very short, as may be your case (relatively small
> >> data and a simple map task like you have described), the 8 tasks in
> >> your stage may be allocated to only 1 executor in 2 waves of 4, since
> >> the second executor won't have responded to the master before the
> >> first 4 tasks on the first executor have completed.
> >>
> >> To see if this is the cause in your particular case, you could try the
> >> following to confirm:
> >> 1. Examine the starting times of the tasks alongside their
> >> executor
> >> 2. Make a "dummy" stage execute before your real stages to
> >> synchronize the executors by creating and materializing any random RDD
> >> 3. Make the tasks longer, i.e. with some silly computational
> >> work.
> >>
> >> Mike
> >>
> >>
> >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> >> > Yes its the same data.
> >> >
> >> > 1) The number of partitions are the same (8, which is an argument to
> >> > the
> >> > HashPartitioner). In the first case, these partitions are spread
> across
> >> > both the worker nodes. In the second case, all the partitions are on
> >> > the
> >> > same node.
> >> > 2) What resources would be of interest here? Scala shell takes the
> >> default
> >> > parameters since we use "bin/spark-shell --master " to run
> >> the
> >> > scala-shell. For the scala program, we do set some configuration
> >> > options
> >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> >> > serializer.
> >> >
> >> > We are running this on Azure D3-v2 machines which have 4 cores and
> 14GB
> >> > RAM.1 executor runs on each worker node. Following configuration
> >> > options
> >> > are set for the scala program -- perhaps we should move it to the
> spark
> >> > config file.
> >> >
> >> > Driver memory and executor memory are set to 12GB
> >> > parallelism is set to 8
> >> > Kryo serializer is used
> >> > Number of retainedJobs and retainedStages has been increased to check
> >> them
> >> > in the UI.
> >> >
> >> > What information regarding Spark Context would be of interest here?
> >> >
> >> > Regards,
> >> > Raghava.
> >> >
> >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com>
> >> > wrote:
> >> >
> >> >> If the data file is same then it should have similar distribution of
> >> >> keys.
> >> >> Few queries-
> >> >>
> >>

Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Raghava Mutharaju
We are testing with 52MB, but it would go to 20GB and more later on. The
cluster size is also not static, we would be growing it. But the issue here
is the behavior of HashPartitioner -- from what I understand, it should be
partitioning the data based on the hash of the key irrespective of the RAM
size (which is more than adequate now). This behavior is different in
spark-shell and spark scala program.

We are not using YARN, its the stand alone version of Spark.

Regards,
Raghava.


On Mon, Apr 18, 2016 at 12:09 AM, Anuj Kumar <anujs...@gmail.com> wrote:

> Few params like- spark.task.cpus, spark.cores.max will help. Also, for
> 52MB of data you need not have 12GB allocated to executors. Better to
> assign 512MB or so and increase the number of executors per worker node.
> Try reducing that executor memory to 512MB or so for this case.
>
> On Mon, Apr 18, 2016 at 9:07 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Yes its the same data.
>>
>> 1) The number of partitions are the same (8, which is an argument to the
>> HashPartitioner). In the first case, these partitions are spread across
>> both the worker nodes. In the second case, all the partitions are on the
>> same node.
>> 2) What resources would be of interest here? Scala shell takes the
>> default parameters since we use "bin/spark-shell --master " to
>> run the scala-shell. For the scala program, we do set some configuration
>> options such as driver memory (12GB), parallelism is set to 8 and we use
>> Kryo serializer.
>>
>> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> RAM.1 executor runs on each worker node. Following configuration options
>> are set for the scala program -- perhaps we should move it to the spark
>> config file.
>>
>> Driver memory and executor memory are set to 12GB
>> parallelism is set to 8
>> Kryo serializer is used
>> Number of retainedJobs and retainedStages has been increased to check
>> them in the UI.
>>
>> What information regarding Spark Context would be of interest here?
>>
>> Regards,
>> Raghava.
>>
>> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:
>>
>>> If the data file is same then it should have similar distribution of
>>> keys. Few queries-
>>>
>>> 1. Did you compare the number of partitions in both the cases?
>>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> Program being submitted?
>>>
>>> Also, can you please share the details of Spark Context, Environment and
>>> Executors when you run via Scala program?
>>>
>>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> We are using HashPartitioner in the following way on a 3 node cluster
>>>> (1 master and 2 worker nodes).
>>>>
>>>> val u =
>>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>>
>>>> u.count()
>>>>
>>>> If we run this from the spark shell, the data (52 MB) is split across
>>>> the two worker nodes. But if we put this in a scala program and run it,
>>>> then all the data goes to only one node. We have run it multiple times, but
>>>> this behavior does not change. This seems strange.
>>>>
>>>> Is there some problem with the way we use HashPartitioner?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Regards,
>>>> Raghava.
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Raghava Mutharaju
Yes its the same data.

1) The number of partitions are the same (8, which is an argument to the
HashPartitioner). In the first case, these partitions are spread across
both the worker nodes. In the second case, all the partitions are on the
same node.
2) What resources would be of interest here? Scala shell takes the default
parameters since we use "bin/spark-shell --master " to run the
scala-shell. For the scala program, we do set some configuration options
such as driver memory (12GB), parallelism is set to 8 and we use Kryo
serializer.

We are running this on Azure D3-v2 machines which have 4 cores and 14GB
RAM.1 executor runs on each worker node. Following configuration options
are set for the scala program -- perhaps we should move it to the spark
config file.

Driver memory and executor memory are set to 12GB
parallelism is set to 8
Kryo serializer is used
Number of retainedJobs and retainedStages has been increased to check them
in the UI.

What information regarding Spark Context would be of interest here?

Regards,
Raghava.

On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:

> If the data file is same then it should have similar distribution of keys.
> Few queries-
>
> 1. Did you compare the number of partitions in both the cases?
> 2. Did you compare the resource allocation for Spark Shell vs Scala
> Program being submitted?
>
> Also, can you please share the details of Spark Context, Environment and
> Executors when you run via Scala program?
>
> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> We are using HashPartitioner in the following way on a 3 node cluster (1
>> master and 2 worker nodes).
>>
>> val u = sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>
>> u.count()
>>
>> If we run this from the spark shell, the data (52 MB) is split across the
>> two worker nodes. But if we put this in a scala program and run it, then
>> all the data goes to only one node. We have run it multiple times, but this
>> behavior does not change. This seems strange.
>>
>> Is there some problem with the way we use HashPartitioner?
>>
>> Thanks in advance.
>>
>> Regards,
>> Raghava.
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


strange HashPartitioner behavior in Spark

2016-04-17 Thread Raghava Mutharaju
Hello All,

We are using HashPartitioner in the following way on a 3 node cluster (1
master and 2 worker nodes).

val u = sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()

u.count()

If we run this from the spark shell, the data (52 MB) is split across the
two worker nodes. But if we put this in a scala program and run it, then
all the data goes to only one node. We have run it multiple times, but this
behavior does not change. This seems strange.

Is there some problem with the way we use HashPartitioner?

Thanks in advance.

Regards,
Raghava.


DataFrames - Kryo registration issue

2016-03-10 Thread Raghava Mutharaju
Hello All,

If Kryo serialization is enabled, doesn't Spark take care of registration
of built-in classes, i.e., are we not supposed to register just the custom
classes?

When using DataFrames, this does not seem to be the case. I had to register
the following classes

conf.registerKryoClasses(Array(classOf[org.apache.spark.sql.types.StructType],
classOf[org.apache.spark.sql.types.StructField],
classOf[Array[org.apache.spark.sql.types.StructField]],
classOf[org.apache.spark.sql.types.LongType$],
classOf[org.apache.spark.sql.types.Metadata],
classOf[scala.collection.immutable.Map$EmptyMap$],
classOf[org.apache.spark.sql.catalyst.InternalRow],
   classOf[Array[org.apache.spark.sql.catalyst.InternalRow]],
 classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow],
 classOf[Array[org.apache.spark.sql.catalyst.expressions.UnsafeRow]],
 Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"),
Class.forName("java.util.HashMap"),
classOf[scala.reflect.ClassTag$$anon$1],
Class.forName("java.lang.Class"),
 Class.forName("org.apache.spark.sql.execution.columnar.CachedBatch")))

I got the following exception

com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered: byte[][]

But byte is not a class. So I couldn't register it -- compiler complains
that byte is not a class. How can I register byte[][] in Scala?

Does this point to some other issue?

In some other posts, I noticed use of kryo.register(). In this case, how do
we pass the kryo object to SparkContext?

Thanks in advance.

Regards,
Raghava.


Dataset takes more memory compared to RDD

2016-02-12 Thread Raghava Mutharaju
Hello All,

I implemented an algorithm using both the RDDs and the Dataset API (in
Spark 1.6). Dataset version takes lot more memory than the RDDs. Is this
normal? Even for very small input data, it is running out of memory and I
get a java heap exception.

I tried the Kryo serializer by registering the classes and I
set spark.kryo.registrationRequired to true. I get the following exception

com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.sql.types.StructField[]
Note: To register this class use:
kryo.register(org.apache.spark.sql.types.StructField[].class);

I tried registering
using conf.registerKryoClasses(Array(classOf[StructField[]]))

But StructField[] does not exist. Is there any other way to register it? I
already registered StructField.

Regards,
Raghava.


Re: Dataset joinWith condition

2016-02-10 Thread Raghava Mutharaju
Thanks a lot Ted.

If the two columns are of different types say Int and Long, then will be
ds.select(expr("_2 / _1").as[(Int, Long)])

Regards,
Raghava.


On Wed, Feb 10, 2016 at 5:19 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. I followed something similar $"a.x"
>
> Please use expr("...")
> e.g. if your DataSet has two columns, you can write:
>   ds.select(expr("_2 / _1").as[Int])
>
> where _1 refers to first column and _2 refers to second.
>
> On Tue, Feb 9, 2016 at 3:31 PM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Ted,
>>
>> Thank you for the pointer. That works, but what does a string prepended
>> with $ sign mean? Is it an expression?
>>
>> Could you also help me with the select() parameter syntax? I followed
>> something similar $"a.x" and it gives an error message that a TypedColumn
>> is expected.
>>
>> Regards,
>> Raghava.
>>
>>
>> On Tue, Feb 9, 2016 at 10:12 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Please take a look at:
>>> sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
>>>
>>> val ds1 = Seq(1, 2, 3).toDS().as("a")
>>> val ds2 = Seq(1, 2).toDS().as("b")
>>>
>>> checkAnswer(
>>>   ds1.joinWith(ds2, $"a.value" === $"b.value", "inner"),
>>>
>>> On Tue, Feb 9, 2016 at 7:07 AM, Raghava Mutharaju <
>>> m.vijayaragh...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> joinWith() method in Dataset takes a condition of type Column. Without
>>>> converting a Dataset to a DataFrame, how can we get a specific column?
>>>>
>>>> For eg: case class Pair(x: Long, y: Long)
>>>>
>>>> A, B are Datasets of type Pair and I want to join A.x with B.y
>>>>
>>>> A.joinWith(B, A.toDF().col("x") == B.toDF().col("y"))
>>>>
>>>> Is there a way to avoid using toDF()?
>>>>
>>>> I am having similar issues with the usage of filter(A.x == B.y)
>>>>
>>>> --
>>>> Regards,
>>>> Raghava
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Raghava
>> http://raghavam.github.io
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io


Dataset joinWith condition

2016-02-09 Thread Raghava Mutharaju
Hello All,

joinWith() method in Dataset takes a condition of type Column. Without
converting a Dataset to a DataFrame, how can we get a specific column?

For eg: case class Pair(x: Long, y: Long)

A, B are Datasets of type Pair and I want to join A.x with B.y

A.joinWith(B, A.toDF().col("x") == B.toDF().col("y"))

Is there a way to avoid using toDF()?

I am having similar issues with the usage of filter(A.x == B.y)

-- 
Regards,
Raghava


Re: Dataset joinWith condition

2016-02-09 Thread Raghava Mutharaju
Ted,

Thank you for the pointer. That works, but what does a string prepended
with $ sign mean? Is it an expression?

Could you also help me with the select() parameter syntax? I followed
something similar $"a.x" and it gives an error message that a TypedColumn
is expected.

Regards,
Raghava.


On Tue, Feb 9, 2016 at 10:12 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Please take a look at:
> sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
>
> val ds1 = Seq(1, 2, 3).toDS().as("a")
> val ds2 = Seq(1, 2).toDS().as("b")
>
> checkAnswer(
>   ds1.joinWith(ds2, $"a.value" === $"b.value", "inner"),
>
> On Tue, Feb 9, 2016 at 7:07 AM, Raghava Mutharaju <
> m.vijayaragh...@gmail.com> wrote:
>
>> Hello All,
>>
>> joinWith() method in Dataset takes a condition of type Column. Without
>> converting a Dataset to a DataFrame, how can we get a specific column?
>>
>> For eg: case class Pair(x: Long, y: Long)
>>
>> A, B are Datasets of type Pair and I want to join A.x with B.y
>>
>> A.joinWith(B, A.toDF().col("x") == B.toDF().col("y"))
>>
>> Is there a way to avoid using toDF()?
>>
>> I am having similar issues with the usage of filter(A.x == B.y)
>>
>> --
>> Regards,
>> Raghava
>>
>
>


-- 
Regards,
Raghava
http://raghavam.github.io