How default partitioning in spark is deployed

2021-03-15 Thread Renganathan Mutthiah
Hi,

I have a question with respect to default partitioning in RDD.




*case class Animal(id:Int, name:String)   val myRDD =
session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
"Chetah") ) ))Console println myRDD.getNumPartitions  *

I am running the above piece of code in my laptop which has 12 logical
cores.
Hence I see that there are 12 partitions created.

My understanding is that hash partitioning is used to determine which
object needs to go to which partition. So in this case, the formula would
be: hashCode() % 12
But when I further examine, I see all the RDDs are put in the last
partition.

*myRDD.foreachPartition( e => { println("--"); e.foreach(println) }
)*

Above code prints the below(first eleven partitions are empty and the last
one has all the objects. The line is separate the partition contents):
--
--
--
--
--
--
--
--
--
--
--
--
Animal(2,Elephant)
Animal(4,Tiger)
Animal(3,Jaguar)
Animal(5,Chetah)
Animal(1,Lion)

I don't know why this happens. Can you please help.

Thanks!


Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Mich,

Thanks for your precious time looking into my query. Yes, when we increase
the number of objects, all partitions start having the data. I actually
tried to understand what happens in my particular case.

Thanks!

On Tue, Mar 16, 2021 at 2:10 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Well as it appears you have 5 entries in your data and 12 cores. The
> theory is that you run multiple tasks in parallel across multiple cores
> on a desktop which applies to your case. The statistics is not there to
> give a meaningful interpretation why Spark decided to put all data in one
> partition. If an RDD has too many partitions, then task scheduling may
> take more time than the actual execution time. In summary you just do not
> have enough statistics to draw a meaningful conclusion.
>
> Try to generate 100,000 rows and run your query and look at the pattern.
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to go to which partition. So in this case, the formula would
>> be: hashCode() % 12
>> But when I further examine, I see all the RDDs are put in the last
>> partition.
>>
>> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
>> } )*
>>
>> Above code prints the below(first eleven partitions are empty and the
>> last one has all the objects. The line is separate the partition contents):
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> Animal(2,Elephant)
>> Animal(4,Tiger)
>> Animal(3,Jaguar)
>> Animal(5,Chetah)
>> Animal(1,Lion)
>>
>> I don't know why this happens. Can you please help.
>>
>> Thanks!
>>
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print
statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by
different threads, they all printed the first line almost at the same time
and followed by data which is also printed at almost the same time. This
has given an appearance that all the data is stored in a single partition.
When I run the below code, I can see that the objects are stored in
different partitions of course!

*myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
println("Index : " +index +" " + e)); itr}, true).collect()*

Prints the below... (index: ?  the ? is actually the partition number)
*Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi!
>
> This is weird. The code of foreachPartition
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>  leads
> to ParallelCollectionRDD
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>  which
> ends in slice
> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
> where the most important part is the *positions* method:
>
>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>  (0 until numSlices).iterator.map { i =>
> val start = ((i * length) / numSlices).toInt
> val end = (((i + 1) * length) / numSlices).toInt
> (start, end)
>  }
>  }
>
> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
> some scala implicit might generate a Seq with one Array in it.
> But in that case your output would contain an Array. So this must be not
> the case.
>
> 1) What Spark/Scala version you are using? on what OS?
>
> 2)  Can you reproduce this issue in the spark-shell?
>
> scala> case class Animal(id:Int, name:String)
> defined class Animal
>
> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
> Tiger"), Animal(5, "Chetah") ) ), 12)
> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
> parallelize at :27
>
> scala> myRDD.foreachPartition( e => { println("--");
> e.foreach(println) } )
> --
> --
> --
> Animal(1,Lion)
> --
> --
> Animal(2,Elephant)
> --
> --
> --
> Animal(3,Jaguar)
> --
> --
> Animal(4,Tiger)
> --
> --
> Animal(5,Chetah)
>
> scala> Console println myRDD.getNumPartitions
> 12
>
> 3) Can you please check spark-shell what happens when you paste the above
> method and call it like:
>
> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] =
> {
>  |   (0 until numSlices).iterator.map { i =>
>  | val start = ((i * length) / numSlices).toInt
>  |   val end = (((i + 1) * length) / numSlices).toInt
>  |   (start, end)
>  |   }
>  | }
> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>
> scala> positions(5, 12).foreach(println)
> (0,0)
> (0,0)
> (0,1)
> (1,1)
> (1,2)
> (2,2)
> (2,2)
> (2,3)
> (3,3)
> (3,4)
> (4,4)
> (4,5)
>
> As you can see in my case the `positions` result consistent with the 
> `foreachPartition`
> and this should be deterministic.
>
> Best regards,
> Attila
>
>
> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object 

Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
That's a very good idea, thanks for sharing German!

On Tue, Mar 16, 2021 at 7:08 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L801-L806>
>>>  leads
>>> to ParallelCollectionRDD
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L84-L107>
>>>  which
>>> ends in slice
>>> <https://github.com/apache/spark/blob/f643bd96593dc411cb0cca1c7a3f28f93765c9b6/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L116-L155>,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>>  |   (0 until numSlices).iterator.map { i =>
>>>  | val start = ((i * length) / numSlices).toInt
>>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>>  |   (start, end)
>>>  |   }
>>>  | }
>>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>>
>>> scala> posi

Spark Architecture Question

2021-07-29 Thread Renganathan Mutthiah
Hi,

I have read in many materials (including from the book: Spark - The
Definitive Guide) that Spark is a compiler.

In my understanding, our program is used until the point of DAG generation.
This portion can be written in any language - Java,Scala,R,Python.
Post that (executing the DAG), the engine runs in Scala only. This leads to
Spark being called as a compiler.

If the above is true, we need to install R / Python only in the driver
machine. R / Python run time is not needed in worker nodes. Am I correct ?

Thanks!


Spark-SQL plugin into HIVE

2021-07-30 Thread Renganathan Mutthiah
Hi,


HIVE has a metastore and HIVESERVER2 listens for SQL requests; with the
help of metastore, the query is executed and the result is passed back.
The Thrift framework is actually customised as HIVESERVER2. In this way,
HIVE is acting as a service. Via programming language, we can use HIVE as a
database.

The relationship between Spark-SQL and HIVE is that:

Spark-SQL just utilises the HIVE setup (HDFS file system, HIVE Metastore,
Hiveserver2). When we invoke /sbin/start-thriftserver2.sh (present in spark
installation), we are supposed to give hiveserver2 port number, and the
hostname. Then via spark's beeline, we can actually create, drop and
manipulate tables in HIVE. The API can be either Spark-SQL or HIVE QL.
If we create a table / drop a table, it will be clearly visible if we login
into HIVE and check(say via HIVE beeline or HIVE CLI). To put in other
words, changes made via Spark can be seen in HIVE tables.

My understanding is that Spark does not have its own meta store setup like
HIVE. Spark just utilises the HIVE setup and simply the SQL execution
happens via Spark SQL API.

Is my understanding correct here?

Then I am little confused about the usage of bin/spark-sql.sh (which is
also present in Spark installation). Documentation says that via this SQL
shell, we can create tables like we do above (via Thrift Server/Beeline).
Now my question is: How the metadata information is maintained by spark
then?

Or like the first approach, can we make spark-sql CLI to communicate to
HIVE (to be specific: hiveserver2 of HIVE) ?
If yes, how can we do that ?

Thanks!