Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-08 Thread Swapnil Shinde
Great news.. thank you very much!

On Thu, Nov 8, 2018, 5:19 PM Stavros Kontopoulos <
stavros.kontopou...@lightbend.com wrote:

> Awesome!
>
> On Thu, Nov 8, 2018 at 9:36 PM, Jules Damji  wrote:
>
>> Indeed!
>>
>> Sent from my iPhone
>> Pardon the dumb thumb typos :)
>>
>> On Nov 8, 2018, at 11:31 AM, Dongjoon Hyun 
>> wrote:
>>
>> Finally, thank you all. Especially, thanks to the release manager,
>> Wenchen!
>>
>> Bests,
>> Dongjoon.
>>
>>
>> On Thu, Nov 8, 2018 at 11:24 AM Wenchen Fan  wrote:
>>
>>> + user list
>>>
>>> On Fri, Nov 9, 2018 at 2:20 AM Wenchen Fan  wrote:
>>>
 resend

 On Thu, Nov 8, 2018 at 11:02 PM Wenchen Fan 
 wrote:

>
>
> -- Forwarded message -
> From: Wenchen Fan 
> Date: Thu, Nov 8, 2018 at 10:55 PM
> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
> To: Spark dev list 
>
>
> Hi all,
>
> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
> adds Barrier Execution Mode for better integration with deep learning
> frameworks, introduces 30+ built-in and higher-order functions to deal 
> with
> complex data type easier, improves the K8s integration, along with
> experimental Scala 2.12 support. Other major updates include the built-in
> Avro data source, Image data source, flexible streaming sinks, elimination
> of the 2GB block size limitation during transfer, Pandas UDF improvements.
> In addition, this release continues to focus on usability, stability, and
> polish while resolving around 1100 tickets.
>
> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
>
> To download Spark 2.4.0, head over to the download page:
> http://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-0.html
>
> Thanks,
> Wenchen
>
> PS: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list.
>

>
>
>
>


Spark scala development in Sbt vs Maven

2018-03-05 Thread Swapnil Shinde
Hello
   SBT's incremental compilation was a huge plus to build spark+scala
applications in SBT for some time. It seems Maven can also support
incremental compilation with Zinc server. Considering that, I am interested
to know communities experience -

1. Spark documentation says SBT is being used by many contributors for day
to day development mainly because of incremental compilation. Considering
Maven is supporting incremental compilation through Zinc, do contributors
prefer to change from SBT to maven?

2. Any issues /learning experiences with Maven + Zinc?

3. Any other reasons to use SBT over Maven for scala development.

I understand SBT has many other advantages over Maven like cross version
publishing etc. but incremental compilation is major need for us. I am more
interested to know why Spark contributors/committers prefer SBT for day to
day development.

Any help and advice would help us to direct our evaluations in right
direction,

Thanks
Swapnil


Minimum cost flow problem solving in Spark

2017-09-13 Thread Swapnil Shinde
Hello
Has anyone used Spark to solve minimum cost flow problems in Spark? I
am quite new to combinatorial optimization algorithms so any help or
suggestions, libraries are very appreciated.

Thanks
Swapnil


Re: Inconsistent results with combineByKey API

2017-09-05 Thread Swapnil Shinde
Ping.. Can someone please correct me whether this is an issue or not.

-
Swapnil

On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde <swapnilushi...@gmail.com>
wrote:

> Hello All
>
> I am observing some strange results with aggregateByKey API which is
> implemented with combineByKey. Not sure if this is by design or bug -
>
> I created this toy example but same problem can be observed on large
> datasets as well -
>
> *case class ABC(key: Int, c1: Int, c2: Int)*
> *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*
>
> // Create RDD and making sure if has 1 or 2 partitions for this example.
> // With 2 partitions there are high chances that same key could be in same
> partition.
> *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
> 40), ABC(2, 20, 40))).coalece(2)*
>
> Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
> c2 but return ABCoutput with new 'desc' property.
>
> *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
> ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
> x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
> ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> Above query may return results like this -
> [image: Inline image 1]
>
> It means for one of the keys which has all values in same partition didn't
> invoke mergeCombiner function which returns ABCoutput with desc=final.
>
> I am expecting mergeCombiner function to be invoked all the time which is
> not happening. Correct me if wrong, but is this expected behavior?
>
> Further debugging shows that it works fine if I create input RDD with more
> partitions( which increases chances of having rows with same key in
> different partitions)
>
> *val b = a.repartition(20).keyBy(x =>
> x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
> => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
> ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
> m1.c2Sum+m2.c2Sum))*
> [image: Inline image 2]
>
> One more thing to mention - If I make sure my input RDD is partitioned
> then it simply runs aggregation with mapPartitions (here
> <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>).
> Now, this makes sense in terms of aggregations as all values for given key
> are in same partition. However, I have something in my mergeCombiner
> function that I would like to run which wont get invoked.
> Traditional map reduce allows to have different combiner and reduce
> function and it is guaranteed that reduce is always invoked. I can see that
> running aggregations with no shuffle has performance gains but API seems to
> be confusing/misleading. User might hope that mergeCombiner gets invoked
> but in reality it isn't. It will be great if this API designers can shed
> some light on this.
>
> *import org.apache.spark.HashPartitioner*
> *val b = a.keyBy(x => x.key).partitionBy(new
> HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
> ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
> x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
> "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> [image: Inline image 3]
>
> Above examples shows this behavior with AggregateByKey but same thing can
> be observed with CombineByKey as well.
> *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
> "initial", x.c1, x.c2), *
> *  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum
> + x2.c1, x1.c2Sum+x2.c2),*
> *  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
> x2.c1Sum, x1.c2Sum+x2.c2Sum))*
>
> *[image: Inline image 4]*
>
>
> Please let me know if you need any further information and correct me if
> my understanding of API is wrong.
>
> Thanks
> Swapnil
>


Inconsistent results with combineByKey API

2017-08-31 Thread Swapnil Shinde
Hello All

I am observing some strange results with aggregateByKey API which is
implemented with combineByKey. Not sure if this is by design or bug -

I created this toy example but same problem can be observed on large
datasets as well -

*case class ABC(key: Int, c1: Int, c2: Int)*
*case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*

// Create RDD and making sure if has 1 or 2 partitions for this example.
// With 2 partitions there are high chances that same key could be in same
partition.
*val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
40), ABC(2, 20, 40))).coalece(2)*

Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
c2 but return ABCoutput with new 'desc' property.

*val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

Above query may return results like this -
[image: Inline image 1]

It means for one of the keys which has all values in same partition didn't
invoke mergeCombiner function which returns ABCoutput with desc=final.

I am expecting mergeCombiner function to be invoked all the time which is
not happening. Correct me if wrong, but is this expected behavior?

Further debugging shows that it works fine if I create input RDD with more
partitions( which increases chances of having rows with same key in
different partitions)

*val b = a.repartition(20).keyBy(x =>
x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
=> ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
m1.c2Sum+m2.c2Sum))*
[image: Inline image 2]

One more thing to mention - If I make sure my input RDD is partitioned then
it simply runs aggregation with mapPartitions (here
).
Now, this makes sense in terms of aggregations as all values for given key
are in same partition. However, I have something in my mergeCombiner
function that I would like to run which wont get invoked.
Traditional map reduce allows to have different combiner and reduce
function and it is guaranteed that reduce is always invoked. I can see that
running aggregations with no shuffle has performance gains but API seems to
be confusing/misleading. User might hope that mergeCombiner gets invoked
but in reality it isn't. It will be great if this API designers can shed
some light on this.

*import org.apache.spark.HashPartitioner*
*val b = a.keyBy(x => x.key).partitionBy(new
HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
"final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

[image: Inline image 3]

Above examples shows this behavior with AggregateByKey but same thing can
be observed with CombineByKey as well.
*val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
"initial", x.c1, x.c2), *
*  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum +
x2.c1, x1.c2Sum+x2.c2),*
*  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
x2.c1Sum, x1.c2Sum+x2.c2Sum))*

*[image: Inline image 4]*


Please let me know if you need any further information and correct me if my
understanding of API is wrong.

Thanks
Swapnil


CSV output with JOBUUID

2017-05-10 Thread Swapnil Shinde
Hello
I am using spark-2.0.1 and saw that CSV fileformat stores output with
JOBUUID in it.
https://github.com/apache/spark/blob/v2.0.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala#L191

I want to avoid csv writing JOBUUID in it. Is there any property (may be
spark.sql.sources.writeJobUUID) that can skip this JOBUUID in output part
filenames?

Furthermore, I couldn't find how this JOBUUID is used in spark-2.1.0
version. We are planning to upgrade very soon so wanted to know how 2.1.0
version handles it in code.

Any help or pointers is very much appreciated!

Thanks
Swapnil


Re: Huge partitioning job takes longer to close after all tasks finished

2017-03-08 Thread Swapnil Shinde
Thank you liu. Can you please explain what do you mean by enabling spark
fault tolerant mechanism?
I observed that after all tasks finishes, spark is working on concatenating
same partitions from all tasks on file system. eg,
task1 - partition1, partition2, partition3
task2 - partition1, partition2, partition3

Then after task1, task2 finishes, spark concatenates partition1 from task1,
task2 to create partition1. This is taking longer if we have large number
of files. I am not sure if there is a way to let spark not to concatenate
partitions from each task.

Thanks
Swapnil


On Tue, Mar 7, 2017 at 10:47 PM, cht liu <liucht...@gmail.com> wrote:

> Do you enable the spark fault tolerance mechanism, RDD run at the end of
> the job, will start a separate job, to the checkpoint data written to the
> file system before the persistence of high availability
>
> 2017-03-08 2:45 GMT+08:00 Swapnil Shinde <swapnilushi...@gmail.com>:
>
>> Hello all
>>I have a spark job that reads parquet data and partition it based on
>> one of the columns. I made sure partitions equally distributed and not
>> skewed. My code looks like this -
>>
>> datasetA.write.partitonBy("column1").parquet(outputPath)
>>
>> Execution plan -
>> [image: Inline image 1]
>>
>> All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins
>> to close application. I am not sure what spark is doing after all tasks are
>> processes successfully.
>> I checked thread dump (using UI executor tab) on few executors but
>> couldnt find anything major. Overall, few shuffle-client processes are
>> "RUNNABLE" and few dispatched-* processes are "WAITING".
>>
>> Please let me know what spark is doing at this stage(after all tasks
>> finished) and any way I can optimize it.
>>
>> Thanks
>> Swapnil
>>
>>
>>
>


Huge partitioning job takes longer to close after all tasks finished

2017-03-07 Thread Swapnil Shinde
Hello all
   I have a spark job that reads parquet data and partition it based on one
of the columns. I made sure partitions equally distributed and not skewed.
My code looks like this -

datasetA.write.partitonBy("column1").parquet(outputPath)

Execution plan -
[image: Inline image 1]

All tasks(~12,000) finishes in 30-35 mins but it takes another 40-45 mins
to close application. I am not sure what spark is doing after all tasks are
processes successfully.
I checked thread dump (using UI executor tab) on few executors but couldnt
find anything major. Overall, few shuffle-client processes are "RUNNABLE"
and few dispatched-* processes are "WAITING".

Please let me know what spark is doing at this stage(after all tasks
finished) and any way I can optimize it.

Thanks
Swapnil


Spark shuffle: FileNotFound exception

2016-12-03 Thread Swapnil Shinde
Hello All
I am facing FileNotFoundException for shuffle index file when running
job with large data. Same job runs fine with smaller datasets. These our my
cluster specifications -

No of nodes - 19
Total cores - 380
Memory per executor - 32G
Spark 1.6 mapr version
spark.shuffle.service.enabled - false

 I am running job with 28G memory, 50 executors and 1 core per
executor. Job is failing at stage having dataframe explode where each row
gets multiplied to 6 rows. Here are exception details-

Caused by: java.lang.RuntimeException: java.io.FileNotFoundException:
/tmp/hadoop-mapr/nm-local-dir/usercache/sshinde/appcache/application_1480622725467_0071/blockmgr-3b2051f5-81c8-40a5-a332-9d32b4586a5d/38/shuffle_14_229_0.index
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:291)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

I tried with below configurations but nothing worked out-
conf.set("spark.io.compression.codec", "lz4")
conf.set("spark.network.timeout", "1000s")
conf.set("spark.sql.shuffle.partitions", "2500")
spark.yarn.executor.memoryOverhead should be high due to 32g of
executor memory. (10% of 32g)
  Increased number of partitions till 15000
  I checked yarn logs briefly and nothing stand out apart from above
exception.


Please let me if there is something I am missing or alternatives to make
large data jobs run.  Thank you..

Thanks
Swapnil


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
I am using Spark 1.6.3 and below is the real plan (a,b,c in above were just
for illustration purpose)

== Physical Plan ==
Project [ltt#3800 AS ltt#3814,CASE WHEN isnull(etv_demo_id#3813) THEN
mr_demo_id#3801 ELSE etv_demo_id#3813 AS etv_demo_id#3815]
+- SortMergeOuterJoin [mr_demoname#3802,mr_demo_id#3801],
[mr_demoname#3810,mr_demo_id#3811], LeftOuter, None
   :- Sort [mr_demoname#3802 ASC,mr_demo_id#3801 ASC], false, 0
   :  +- TungstenExchange
hashpartitioning(mr_demoname#3802,mr_demo_id#3801,200), None
   : +- Project [_1#3797 AS ltt#3800,_2#3798 AS mr_demo_id#3801,_3#3799
AS mr_demoname#3802]
   :+- Scan ExistingRDD[_1#3797,_2#3798,_3#3799]
   +- Sort [mr_demoname#3810 ASC,mr_demo_id#3811 ASC], false, 0
  +- TungstenExchange
hashpartitioning(mr_demoname#3810,mr_demo_id#3811,200), None
 +- Project [mr_demoname#3810,mr_demo_id#3811,etv_demo_id#3813]
+- Project [demogroup#3803 AS mr_demoname#3810,demovalue#3804
AS mr_demo_id#3811,demoname#3805 AS
mr_demo_value#3812,demovalue_etv_map#3806 AS etv_demo_id#3813]
   +- Filter ((map_type#3809 = master_roster_to_etv) && NOT
(demogroup#3803 = gender_age_id))
  +- Scan
ExistingRDD[demogroup#3803,demovalue#3804,demoname#3805,demovalue_etv_map#3806,demoname_etv_map#3807,demovalue_old_map#3808,map_type#3809]


Thanks
Swapnil

On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang <bewang.t...@gmail.com> wrote:

> Could you post the result of explain `c.explain`? If it is broadcast join,
> you will see it in explain.
>
> On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde <swapnilushi...@gmail.com
> > wrote:
>
>> Hello
>> I am trying a broadcast join on dataframes but it is still doing
>> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
>> higher but still no luck.
>>
>> Related piece of code-
>>   val c = a.join(braodcast(b), "id")
>>
>> On a side note, if I do SizeEstimator.estimate(b) and it is really
>> high(460956584 bytes) compared to data it contains. b has just 85 rows and
>> around 4964 bytes.
>> Help is very much appreciated!!
>>
>> Thanks
>> Swapnil
>>
>>
>>
>


Dataframe broadcast join hint not working

2016-11-26 Thread Swapnil Shinde
Hello
I am trying a broadcast join on dataframes but it is still doing
SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
higher but still no luck.

Related piece of code-
  val c = a.join(braodcast(b), "id")

On a side note, if I do SizeEstimator.estimate(b) and it is really
high(460956584 bytes) compared to data it contains. b has just 85 rows and
around 4964 bytes.
Help is very much appreciated!!

Thanks
Swapnil


No plan for broadcastHint

2015-10-02 Thread Swapnil Shinde
Hello
I am trying to do inner join with broadcastHint and getting below exception
-
I tried to increase "sqlContext.conf.autoBroadcastJoinThreshold" but still
no luck.

*Code snippet-*
val dpTargetUvOutput =
pweCvfMUVDist.as("a").join(broadcast(sourceAssgined.as("b")), $"a.web_id"
=== $"b.source_id")
.selectExpr("b.web_id AS web_id",
"b.source_id AS
source_id",
"a.gender_age_id AS
gender_age_id",
"a.hh_size AS hh_size",
"a.M_proj AS M_proj",
"a.cvf_uv_proj AS
cvf_uv_proj")

*Stack trace-*

15/10/02 14:38:45 INFO spark.SparkContext: Created broadcast 19 from
persist at UVModellingMain.scala:76
Exception in thread "main" java.lang.AssertionError: assertion failed: No
plan for BroadcastHint
 InMemoryRelation
[web_id#1128,level_id#1129,program_id#1130,date_day#1131,day_bin#1132,show_time#1133,genre#1134
iff#1135,source_id#1136], true, 1, StorageLevel(true, false, false,
false, 1), (TungstenProject [_1#1119 AS w
128,_2#1120 AS level_id#1129,_3#1121 AS program_id#1130,_4#1122 AS
date_day#1131,_5#1123 AS day_bin#1132,_6#1124
_time#1133,_7#1125 AS genre#1134,_8#1126 AS date_diff#1135,_9#1127 AS
source_id#1136]), None

at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:109)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:138)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:346)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 

Re: Spark driver locality

2015-08-28 Thread Swapnil Shinde
Thanks..
On Aug 28, 2015 4:55 AM, Rishitesh Mishra rishi80.mis...@gmail.com
wrote:

 Hi Swapnil,

 1. All the task scheduling/retry happens from Driver. So you are right
 that a lot of communication happens from driver to cluster. It all depends
 on the how you want to go about your Spark application, whether your
 application has direct access to Spark cluster or its routed through a
 gateway machine. Accordingly you can take your decision.

 2. I am not familiar with NFS layer concurrency. But parallel reads should
 be OK I think. Some one with the knowledge of NFS workings should correct
 if I am wrong.


 On Fri, Aug 28, 2015 at 1:12 AM, Swapnil Shinde swapnilushi...@gmail.com
 wrote:

 Thanks Rishitesh !!
 1. I get that driver doesn't need to be on master but there is lot of
 communication between driver and cluster. That's why co-located gateway was
 recommended. How much is the impact of driver not being co-located with
 cluster?

 4. How does hdfs split get assigned to worker node to read data from
 remote hadoop cluster? I am more interested to know how mapr NFS layer is
 accessed in parallel.

 -
 Swapnil


 On Thu, Aug 27, 2015 at 2:53 PM, Rishitesh Mishra 
 rishi80.mis...@gmail.com wrote:

 Hi Swapnil,
 Let me try to answer some of the questions. Answers inline. Hope it
 helps.

 On Thursday, August 27, 2015, Swapnil Shinde swapnilushi...@gmail.com
 wrote:

 Hello
 I am new to spark world and started to explore recently in standalone
 mode. It would be great if I get clarifications on below doubts-

 1. Driver locality - It is mentioned in documentation that client
 deploy-mode is not good if machine running spark-submit is not co-located
 with worker machines. cluster mode is not available with standalone
 clusters. Therefore, do we have to submit all applications on master
 machine? (Assuming we don't have separate co-located gateway machine)


 No. In standalone mode also your master and driver machines can be
 different.

 Driver should have access to Master as well as worker machines.



 2. How does above driver locality work with spark shell running on
 local machine ?


 Spark shell itself acts as driver. This means your local machine should
 have access to all the cluster machines.


 3. I am little confused with role of driver program. Does driver do any
 computation in spark app life cycle? For instance, in simple row count app,
 worker node calculates local row counts. Does driver sum up local row
 counts? In short where does reduce phase runs in this case?


 Role of driver is to co-ordinate with cluster manager for initial
 resource allocation. After that it needs to schedule tasks to different
 executors assigned to it. It does not do any computation.(unless the
 application itself does something on its own ). Reduce phase is also a
 bunch of tasks, which gets assigned to one or more executors.


 4. In case of accessing hdfs data over network, do worker nodes read
 data in parallel? How does hdfs data over network get accessed in spark
 application?



 Yes. All worker will get a split to read. They read their own split in
 parallel.This means all worker nodes should have access to Hadoop file
 system.



 Sorry if these questions were already discussed..

 Thanks
 Swapnil






Spark driver locality

2015-08-27 Thread Swapnil Shinde
Hello
I am new to spark world and started to explore recently in standalone mode.
It would be great if I get clarifications on below doubts-

1. Driver locality - It is mentioned in documentation that client
deploy-mode is not good if machine running spark-submit is not co-located
with worker machines. cluster mode is not available with standalone
clusters. Therefore, do we have to submit all applications on master
machine? (Assuming we don't have separate co-located gateway machine)

2. How does above driver locality work with spark shell running on local
machine ?

3. I am little confused with role of driver program. Does driver do any
computation in spark app life cycle? For instance, in simple row count app,
worker node calculates local row counts. Does driver sum up local row
counts? In short where does reduce phase runs in this case?

4. In case of accessing hdfs data over network, do worker nodes read data
in parallel? How does hdfs data over network get accessed in spark
application?

Sorry if these questions were already discussed..

Thanks
Swapnil


Re: Spark driver locality

2015-08-27 Thread Swapnil Shinde
Thanks Rishitesh !!
1. I get that driver doesn't need to be on master but there is lot of
communication between driver and cluster. That's why co-located gateway was
recommended. How much is the impact of driver not being co-located with
cluster?

4. How does hdfs split get assigned to worker node to read data from remote
hadoop cluster? I am more interested to know how mapr NFS layer is accessed
in parallel.

-
Swapnil


On Thu, Aug 27, 2015 at 2:53 PM, Rishitesh Mishra rishi80.mis...@gmail.com
wrote:

 Hi Swapnil,
 Let me try to answer some of the questions. Answers inline. Hope it helps.

 On Thursday, August 27, 2015, Swapnil Shinde swapnilushi...@gmail.com
 wrote:

 Hello
 I am new to spark world and started to explore recently in standalone
 mode. It would be great if I get clarifications on below doubts-

 1. Driver locality - It is mentioned in documentation that client
 deploy-mode is not good if machine running spark-submit is not co-located
 with worker machines. cluster mode is not available with standalone
 clusters. Therefore, do we have to submit all applications on master
 machine? (Assuming we don't have separate co-located gateway machine)


 No. In standalone mode also your master and driver machines can be
 different.

 Driver should have access to Master as well as worker machines.



 2. How does above driver locality work with spark shell running on local
 machine ?


 Spark shell itself acts as driver. This means your local machine should
 have access to all the cluster machines.


 3. I am little confused with role of driver program. Does driver do any
 computation in spark app life cycle? For instance, in simple row count app,
 worker node calculates local row counts. Does driver sum up local row
 counts? In short where does reduce phase runs in this case?


 Role of driver is to co-ordinate with cluster manager for initial resource
 allocation. After that it needs to schedule tasks to different executors
 assigned to it. It does not do any computation.(unless the application
 itself does something on its own ). Reduce phase is also a bunch of tasks,
 which gets assigned to one or more executors.


 4. In case of accessing hdfs data over network, do worker nodes read data
 in parallel? How does hdfs data over network get accessed in spark
 application?



 Yes. All worker will get a split to read. They read their own split in
 parallel.This means all worker nodes should have access to Hadoop file
 system.



 Sorry if these questions were already discussed..

 Thanks
 Swapnil