Re: GPU job in Spark 3

2021-04-15 Thread Gourav Sengupta
Hi,

completely agree with Hao. In case you are using YARN try to see the EMR
documentation on how to enable GPU as resource in YARN before trying to use
that in SPARK.

This is one of the most exciting features of SPARK 3, and you can reap huge
benefits out of it :)


Regards,
Gourav Sengupta

On Fri, Apr 9, 2021 at 6:10 PM HaoZ  wrote:

> Hi Martin,
>
> I tested the local mode in Spark on Rapids Accelerator and it works fine
> for
> me.
> The only possible issue is the CUDA 11.2 however the supported CUDA version
> as per https://nvidia.github.io/spark-rapids/docs/download.html is 11.0.
>
> Here is a quick test using Spark local mode.
> Note: When I was testing this local mode, I make sure there is nothing in
> spark-defaults.conf so everything is clean.
>
> ==
> scala> val df = sc.makeRDD(1 to 100, 6).toDF
> df: org.apache.spark.sql.DataFrame = [value: int]
>
> scala> val df2 = sc.makeRDD(1 to 100, 6).toDF
> df2: org.apache.spark.sql.DataFrame = [value: int]
>
> scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a"
> === $"b").count
> res0: Long = 100
> scala> df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a"
> === $"b").explain()
> == Physical Plan ==
> GpuColumnarToRow false
> +- GpuShuffledHashJoin [a#29], [b#31], Inner, GpuBuildRight, false
>:- GpuShuffleCoalesce 2147483647
>:  +- GpuColumnarExchange gpuhashpartitioning(a#29, 10),
> ENSURE_REQUIREMENTS, [id=#221]
>: +- GpuProject [value#2 AS a#29]
>:+- GpuRowToColumnar TargetSize(2147483647)
>:   +- *(1) SerializeFromObject [input[0, int, false] AS
> value#2]
>:  +- Scan[obj#1]
>+- GpuCoalesceBatches RequireSingleBatch
>   +- GpuShuffleCoalesce 2147483647
>  +- GpuColumnarExchange gpuhashpartitioning(b#31, 10),
> ENSURE_REQUIREMENTS, [id=#228]
> +- GpuProject [value#8 AS b#31]
>+- GpuRowToColumnar TargetSize(2147483647)
>   +- *(2) SerializeFromObject [input[0, int, false] AS
> value#8]
>  +- Scan[obj#7]
> ==
>
> Thanks,
> Hao
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: full SQL query graph not shown in monitoring when using cache

2021-04-15 Thread Mich Talebzadeh
possible but i cannot recall i saw it in 3.0.1

Perhaps you just need to upgrade to 3.1.1 which is labelled as stable.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 15 Apr 2021 at 14:53, Kalin Stoyanov  wrote:

> Yes, it seems to show the full graph in 3.1.1, maybe it was in issue in
> 3.0.1 and was fixed..
>
> On Thu, Apr 15, 2021 at 4:44 PM Mich Talebzadeh 
> wrote:
>
>> Mine is 3.1.1
>>
>>
>> >>> df.show()
>> +---+---+---+
>> | id|  a|  b|
>> +---+---+---+
>> |  1| 24| 32|
>> +---+---+---+
>>
>>
>> Mine shows a fuller details (DAgs) including optimiser printout as
>> attached
>>
>> The full explain is below
>>
>>  Details
>>
>> == Physical Plan ==
>> CollectLimit (4)
>> +- * Project (3)
>>+- * ColumnarToRow (2)
>>   +- InMemoryTableScan (1)
>> +- InMemoryRelation (2)
>>   +- * HashAggregate (9)
>>  +- Exchange (8)
>> +- * HashAggregate (7)
>>+- CartesianProduct Inner (6)
>>   :- * Scan ExistingRDD (3)
>>   +- * Project (5)
>>  +- * Scan ExistingRDD (4)
>>
>>
>> (1) InMemoryTableScan
>> Output [3]: [id#0L, sum(a)#26L, sum(b)#27L]
>> Arguments: [id#0L, sum(a)#26L, sum(b)#27L]
>>
>> (2) InMemoryRelation
>> Arguments: [id#0L, sum(a)#26L, sum(b)#27L], 
>> CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@3bbae4ee,StorageLevel(disk,
>>  memory, deserialized, 1 replicas),*(4) HashAggregate(keys=[id#0L], 
>> functions=[sum(a#1L), sum(b#2L)], output=[id#0L, sum(a)#26L, sum(b)#27L])
>> +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#30]
>>+- *(3) HashAggregate(keys=[id#0L], functions=[partial_sum(a#1L), 
>> partial_sum(b#2L)], output=[id#0L, sum#33L, sum#34L])
>>   +- CartesianProduct
>>  :- *(1) Scan ExistingRDD[id#0L,a#1L,b#2L]
>>  +- *(2) Project
>> +- *(2) Scan ExistingRDD[id#0L,a#1L,b#2L]
>> ,None)
>>
>> (3) Scan ExistingRDD [codegen id : 1]
>> Output [3]: [id#0L, a#1L, b#2L]
>> Arguments: [id#0L, a#1L, b#2L], MapPartitionsRDD[7] at 
>> applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, 
>> UnknownPartitioning(0)
>>
>> (4) Scan ExistingRDD [codegen id : 2]
>> Output [3]: [id#0L, a#1L, b#2L]
>> Arguments: [id#0L, a#1L, b#2L], MapPartitionsRDD[7] at 
>> applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, 
>> UnknownPartitioning(0)
>>
>> (5) Project [codegen id : 2]
>> Output: []
>> Input [3]: [id#0L, a#1L, b#2L]
>>
>> (6) CartesianProduct
>> Join condition: None
>>
>> (7) HashAggregate [codegen id : 3]
>> Input [3]: [id#0L, a#1L, b#2L]
>> Keys [1]: [id#0L]
>> Functions [2]: [partial_sum(a#1L), partial_sum(b#2L)]
>> Aggregate Attributes [2]: [sum#31L, sum#32L]
>> Results [3]: [id#0L, sum#33L, sum#34L]
>>
>> (8) Exchange
>> Input [3]: [id#0L, sum#33L, sum#34L]
>> Arguments: hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#30]
>>
>> (9) HashAggregate [codegen id : 4]
>> Input [3]: [id#0L, sum#33L, sum#34L]
>> Keys [1]: [id#0L]
>> Functions [2]: [sum(a#1L), sum(b#2L)]
>> Aggregate Attributes [2]: [sum(a#1L)#24L, sum(b#2L)#25L]
>> Results [3]: [id#0L, sum(a#1L)#24L AS sum(a)#26L, sum(b#2L)#25L AS 
>> sum(b)#27L]
>>
>> (2) ColumnarToRow [codegen id : 1]
>> Input [3]: [id#0L, sum(a)#26L, sum(b)#27L]
>>
>> (3) Project [codegen id : 1]
>> Output [3]: [cast(id#0L as string) AS id#131, cast((sum(a)#26L * 2) as 
>> string) AS a#132, cast((sum(b)#27L * 2) as string) AS b#133]
>> Input [3]: [id#0L, sum(a)#26L, sum(b)#27L]
>>
>> (4) CollectLimit
>> Input [3]: [id#131, a#132, b#133]
>> Arguments: 21
>>
>>
>> HTH,
>>
>>
>> Mich
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 15 Apr 2021 at 13:55, Kalin Stoyanov  wrote:
>>
>>> Hi all,
>>>
>>> I noticed something a bit strange.. When working with a cached DF, the
>>> SQL query details graph starts from when the cache takes place, and doesn't
>>> show the transformations before it. For example this code
>>>
>>> >>> df = sc.parallelize([[1,2,3],[1,4,5]]).toDF(['id','a','b'])
>>> >>> renameCols = [f"`{col}` as `{col}_other`" for col in 

Re: full SQL query graph not shown in monitoring when using cache

2021-04-15 Thread Kalin Stoyanov
Yes, it seems to show the full graph in 3.1.1, maybe it was in issue in
3.0.1 and was fixed..

On Thu, Apr 15, 2021 at 4:44 PM Mich Talebzadeh 
wrote:

> Mine is 3.1.1
>
>
> >>> df.show()
> +---+---+---+
> | id|  a|  b|
> +---+---+---+
> |  1| 24| 32|
> +---+---+---+
>
>
> Mine shows a fuller details (DAgs) including optimiser printout as
> attached
>
> The full explain is below
>
>  Details
>
> == Physical Plan ==
> CollectLimit (4)
> +- * Project (3)
>+- * ColumnarToRow (2)
>   +- InMemoryTableScan (1)
> +- InMemoryRelation (2)
>   +- * HashAggregate (9)
>  +- Exchange (8)
> +- * HashAggregate (7)
>+- CartesianProduct Inner (6)
>   :- * Scan ExistingRDD (3)
>   +- * Project (5)
>  +- * Scan ExistingRDD (4)
>
>
> (1) InMemoryTableScan
> Output [3]: [id#0L, sum(a)#26L, sum(b)#27L]
> Arguments: [id#0L, sum(a)#26L, sum(b)#27L]
>
> (2) InMemoryRelation
> Arguments: [id#0L, sum(a)#26L, sum(b)#27L], 
> CachedRDDBuilder(org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer@3bbae4ee,StorageLevel(disk,
>  memory, deserialized, 1 replicas),*(4) HashAggregate(keys=[id#0L], 
> functions=[sum(a#1L), sum(b#2L)], output=[id#0L, sum(a)#26L, sum(b)#27L])
> +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#30]
>+- *(3) HashAggregate(keys=[id#0L], functions=[partial_sum(a#1L), 
> partial_sum(b#2L)], output=[id#0L, sum#33L, sum#34L])
>   +- CartesianProduct
>  :- *(1) Scan ExistingRDD[id#0L,a#1L,b#2L]
>  +- *(2) Project
> +- *(2) Scan ExistingRDD[id#0L,a#1L,b#2L]
> ,None)
>
> (3) Scan ExistingRDD [codegen id : 1]
> Output [3]: [id#0L, a#1L, b#2L]
> Arguments: [id#0L, a#1L, b#2L], MapPartitionsRDD[7] at applySchemaToPythonRDD 
> at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
>
> (4) Scan ExistingRDD [codegen id : 2]
> Output [3]: [id#0L, a#1L, b#2L]
> Arguments: [id#0L, a#1L, b#2L], MapPartitionsRDD[7] at applySchemaToPythonRDD 
> at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
>
> (5) Project [codegen id : 2]
> Output: []
> Input [3]: [id#0L, a#1L, b#2L]
>
> (6) CartesianProduct
> Join condition: None
>
> (7) HashAggregate [codegen id : 3]
> Input [3]: [id#0L, a#1L, b#2L]
> Keys [1]: [id#0L]
> Functions [2]: [partial_sum(a#1L), partial_sum(b#2L)]
> Aggregate Attributes [2]: [sum#31L, sum#32L]
> Results [3]: [id#0L, sum#33L, sum#34L]
>
> (8) Exchange
> Input [3]: [id#0L, sum#33L, sum#34L]
> Arguments: hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#30]
>
> (9) HashAggregate [codegen id : 4]
> Input [3]: [id#0L, sum#33L, sum#34L]
> Keys [1]: [id#0L]
> Functions [2]: [sum(a#1L), sum(b#2L)]
> Aggregate Attributes [2]: [sum(a#1L)#24L, sum(b#2L)#25L]
> Results [3]: [id#0L, sum(a#1L)#24L AS sum(a)#26L, sum(b#2L)#25L AS sum(b)#27L]
>
> (2) ColumnarToRow [codegen id : 1]
> Input [3]: [id#0L, sum(a)#26L, sum(b)#27L]
>
> (3) Project [codegen id : 1]
> Output [3]: [cast(id#0L as string) AS id#131, cast((sum(a)#26L * 2) as 
> string) AS a#132, cast((sum(b)#27L * 2) as string) AS b#133]
> Input [3]: [id#0L, sum(a)#26L, sum(b)#27L]
>
> (4) CollectLimit
> Input [3]: [id#131, a#132, b#133]
> Arguments: 21
>
>
> HTH,
>
>
> Mich
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 15 Apr 2021 at 13:55, Kalin Stoyanov  wrote:
>
>> Hi all,
>>
>> I noticed something a bit strange.. When working with a cached DF, the
>> SQL query details graph starts from when the cache takes place, and doesn't
>> show the transformations before it. For example this code
>>
>> >>> df = sc.parallelize([[1,2,3],[1,4,5]]).toDF(['id','a','b'])
>> >>> renameCols = [f"`{col}` as `{col}_other`" for col in df.columns]
>> >>> df_cart = df.crossJoin(df.selectExpr(renameCols))
>> >>> df = df_cart.groupBy("id").sum("a", "b")
>> >>> df = df.cache()
>> >>> df = df.selectExpr("id", "`sum(a)` * 2 as a", "`sum(b)` * 2 as b")
>> >>> df.show()
>>
>> produces 1 query with this physical plan
>>
>> == Physical Plan ==
>> CollectLimit 21
>> +- *(1) Project [cast(id#0L as string) AS id#58, cast((sum(a)#26L * 2) as 
>> string) AS a#59, cast((sum(b)#27L * 2) as string) AS b#60]
>>+- *(1) ColumnarToRow
>>   +- InMemoryTableScan [id#0L, sum(a)#26L, sum(b)#27L]
>> +- InMemoryRelation [id#0L, sum(a)#26L, sum(b)#27L], 
>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>   +- *(4) HashAggregate(keys=[id#0L], functions=[sum(a#1L), 
>> 

Re: full SQL query graph not shown in monitoring when using cache

2021-04-15 Thread Kalin Stoyanov
Hi Mohamadreza,

This is not happening for me with the example I showed - there is just this
one SQL query logged, and everything before the InMemoryTableScan from the
physical plan is not present in the graph above it. Here's the log itself
if you want to see it.

Regards,
Kalin

On Thu, Apr 15, 2021 at 4:26 PM Mohamadreza Rostami <
mohamadrezarosta...@gmail.com> wrote:

> Hi
> When you a DataFrame cached, the first time you call action, such as SQL
> query, on that DataFrame, you could see all of the transformations are run.
> Still, in the next action calls, these transformations cached and spark run
> only transformations that write after the cache. This is the meaning of the
> cache in Spark.
>
> On Farvardin 26, 1400 AP, at 17:24, Kalin Stoyanov 
> wrote:
>
> Hi all,
>
> I noticed something a bit strange.. When working with a cached DF, the SQL
> query details graph starts from when the cache takes place, and doesn't
> show the transformations before it. For example this code
>
> >>> df = sc.parallelize([[1,2,3],[1,4,5]]).toDF(['id','a','b'])
> >>> renameCols = [f"`{col}` as `{col}_other`" for col in df.columns]
> >>> df_cart = df.crossJoin(df.selectExpr(renameCols))
> >>> df = df_cart.groupBy("id").sum("a", "b")
> >>> df = df.cache()
> >>> df = df.selectExpr("id", "`sum(a)` * 2 as a", "`sum(b)` * 2 as b")
> >>> df.show()
>
> produces 1 query with this physical plan
>
> == Physical Plan ==
> CollectLimit 21
> +- *(1) Project [cast(id#0L as string) AS id#58, cast((sum(a)#26L * 2) as 
> string) AS a#59, cast((sum(b)#27L * 2) as string) AS b#60]
>+- *(1) ColumnarToRow
>   +- InMemoryTableScan [id#0L, sum(a)#26L, sum(b)#27L]
> +- InMemoryRelation [id#0L, sum(a)#26L, sum(b)#27L], 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>   +- *(4) HashAggregate(keys=[id#0L], functions=[sum(a#1L), 
> sum(b#2L)], output=[id#0L, sum(a)#26L, sum(b)#27L])
>  +- Exchange hashpartitioning(id#0L, 4), true, [id=#30]
> +- *(3) HashAggregate(keys=[id#0L], 
> functions=[partial_sum(a#1L), partial_sum(b#2L)], output=[id#0L, sum#33L, 
> sum#34L])
>+- CartesianProduct
>   :- *(1) Scan ExistingRDD[id#0L,a#1L,b#2L]
>   +- *(2) Project
>  +- *(2) Scan ExistingRDD[id#0L,a#1L,b#2L]
>
> But the visual graph representation is just
>
> 
> Is this something that's done on purpose? I'd rather see the whole
> thing... This is on Spark 3.0.1.
>
>
>


local-1618475380597.lz4
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

full SQL query graph not shown in monitoring when using cache

2021-04-15 Thread Kalin Stoyanov
Hi all,

I noticed something a bit strange.. When working with a cached DF, the SQL
query details graph starts from when the cache takes place, and doesn't
show the transformations before it. For example this code

>>> df = sc.parallelize([[1,2,3],[1,4,5]]).toDF(['id','a','b'])
>>> renameCols = [f"`{col}` as `{col}_other`" for col in df.columns]
>>> df_cart = df.crossJoin(df.selectExpr(renameCols))
>>> df = df_cart.groupBy("id").sum("a", "b")
>>> df = df.cache()
>>> df = df.selectExpr("id", "`sum(a)` * 2 as a", "`sum(b)` * 2 as b")
>>> df.show()

produces 1 query with this physical plan

== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(id#0L as string) AS id#58, cast((sum(a)#26L * 2)
as string) AS a#59, cast((sum(b)#27L * 2) as string) AS b#60]
   +- *(1) ColumnarToRow
  +- InMemoryTableScan [id#0L, sum(a)#26L, sum(b)#27L]
+- InMemoryRelation [id#0L, sum(a)#26L, sum(b)#27L],
StorageLevel(disk, memory, deserialized, 1 replicas)
  +- *(4) HashAggregate(keys=[id#0L],
functions=[sum(a#1L), sum(b#2L)], output=[id#0L, sum(a)#26L,
sum(b)#27L])
 +- Exchange hashpartitioning(id#0L, 4), true, [id=#30]
+- *(3) HashAggregate(keys=[id#0L],
functions=[partial_sum(a#1L), partial_sum(b#2L)], output=[id#0L,
sum#33L, sum#34L])
   +- CartesianProduct
  :- *(1) Scan ExistingRDD[id#0L,a#1L,b#2L]
  +- *(2) Project
 +- *(2) Scan ExistingRDD[id#0L,a#1L,b#2L]

But the visual graph representation is just

[image: image.png]
Is this something that's done on purpose? I'd rather see the whole thing...
This is on Spark 3.0.1.