[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-12 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Issue Type: Bug  (was: Question)

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png, image2.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> va
> l ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
> Spark UI for the same job without AQE:
>  
> !Screenshot 2023-07-11 at 9.36.19 AM.png!
>  
> This is causing unexpected regression in our jobs when we try to enable AQE 
> for our jobs in production. We use spark 3.1 in production, but I can see the 
> same behavior in spark 3.2 from the console as well
>  
> !image2.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Attachment: image2.png

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png, image2.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> va
> l ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
> Spark UI for the same job without AQE:
>  
> !Screenshot 2023-07-11 at 9.36.19 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Description: 
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
va


l ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

Spark UI for the same job without AQE:

 

!Screenshot 2023-07-11 at 9.36.19 AM.png!

 

This is causing unexpected regression in our jobs when we try to enable AQE for 
our jobs in production. We use spark 3.1 in production, but I can see the same 
behavior in spark 3.2 from the console as well

 

!image2.png!

  was:
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
va


l ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

Spark UI for the same job without AQE:

 

!Screenshot 2023-07-11 at 9.36.19 AM.png!


> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png, image2.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(inp

[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Description: 
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
va


l ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

Spark UI for the same job without AQE:

 

!Screenshot 2023-07-11 at 9.36.19 AM.png!

  was:
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
va


l ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

 


> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> va
> l ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartit

[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Attachment: Screenshot 2023-07-11 at 9.36.19 AM.png

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png, Screenshot 
> 2023-07-11 at 9.36.19 AM.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> va
> l ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Attachment: Screenshot 2023-07-11 at 9.36.14 AM.png

> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> val ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Priyanka Raju updated SPARK-44378:
--
Description: 
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
va


l ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!

 

 

  was:
We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
val ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

 

 


> Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.
> --
>
> Key: SPARK-44378
> URL: https://issues.apache.org/jira/browse/SPARK-44378
> Project: Spark
>  Issue Type: Question
>  Components: Spark Submit
>Affects Versions: 3.1.2
>Reporter: Priyanka Raju
>Priority: Major
>  Labels: aqe
> Attachments: Screenshot 2023-07-11 at 9.36.14 AM.png
>
>
> We have a few spark scala jobs that are currently running in production. Most 
> jobs typically use Dataset, Dataframes. There is a small code in our custom 
> library code, that makes rdd calls example to check if the dataframe is 
> empty: df.rdd.getNumPartitions == 0
> When I enable aqe for these jobs, this .rdd is converted into a separate job 
> of it's own and the entire dag is executed 2x, taking 2x more time. This does 
> not happen when AQE is disabled. Why does this happen and what is the best 
> way to fix the issue?
>  
> Sample code to reproduce the issue:
>  
>  
> {code:java}
> import org.apache.spark.sql._ 
>   case class Record(
> id: Int,
> name: String
>  )
>  
> val partCount = 4
> val input1 = (0 until 100).map(part => Record(part, "a"))
>  
> val input2 = (100 until 110).map(part => Record(part, "c"))
>  
> implicit val enc: Encoder[Record] = Encoders.product[Record]
>  
> val ds1 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input1, partCount)
> )
>  
> va
> l ds2 = spark.createDataset(
>   spark.sparkContext
> .parallelize(input2, partCount)
> )
>  
> val ds3 = ds1.join(ds2, Seq("id"))
> val l = ds3.count()
>  
> val incomingPartitions = ds3.rdd.getNumPartitions
> log.info(s"Num partitions ${incomingPartitions}")
>   {code}
>  
> Spark UI for the same job with AQE,  !Screenshot 2023-07-11 at 9.36.14 AM.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-

[jira] [Created] (SPARK-44378) Jobs that have join & have .rdd calls get executed 2x when AQE is enabled.

2023-07-11 Thread Priyanka Raju (Jira)
Priyanka Raju created SPARK-44378:
-

 Summary: Jobs that have join & have .rdd calls get executed 2x 
when AQE is enabled.
 Key: SPARK-44378
 URL: https://issues.apache.org/jira/browse/SPARK-44378
 Project: Spark
  Issue Type: Question
  Components: Spark Submit
Affects Versions: 3.1.2
Reporter: Priyanka Raju


We have a few spark scala jobs that are currently running in production. Most 
jobs typically use Dataset, Dataframes. There is a small code in our custom 
library code, that makes rdd calls example to check if the dataframe is empty: 
df.rdd.getNumPartitions == 0

When I enable aqe for these jobs, this .rdd is converted into a separate job of 
it's own and the entire dag is executed 2x, taking 2x more time. This does not 
happen when AQE is disabled. Why does this happen and what is the best way to 
fix the issue?

 

Sample code to reproduce the issue:

 

 
{code:java}
import org.apache.spark.sql._ 
  case class Record(
id: Int,
name: String
 )
 
val partCount = 4
val input1 = (0 until 100).map(part => Record(part, "a"))
 
val input2 = (100 until 110).map(part => Record(part, "c"))
 
implicit val enc: Encoder[Record] = Encoders.product[Record]
 
val ds1 = spark.createDataset(
  spark.sparkContext
.parallelize(input1, partCount)
)
 
val ds2 = spark.createDataset(
  spark.sparkContext
.parallelize(input2, partCount)
)
 
val ds3 = ds1.join(ds2, Seq("id"))
val l = ds3.count()
 
val incomingPartitions = ds3.rdd.getNumPartitions
log.info(s"Num partitions ${incomingPartitions}")
  {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-31695) BigDecimal setScale is not working in Spark UDF

2020-05-12 Thread Saravanan Raju (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-31695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saravanan Raju updated SPARK-31695:
---
Description: 
I was trying to convert json column to map. I tried udf for converting json to 
map. but it is not working as expected.
  
{code:java}
val df1 = Seq(("{\"k\":10.004}")).toDF("json")
def udfJsonStrToMapDecimal = udf((jsonStr: String)=> { var 
jsonMap:Map[String,Any] = parse(jsonStr).values.asInstanceOf[Map[String, Any]]
 jsonMap.map{case(k,v) => 
(k,BigDecimal.decimal(v.asInstanceOf[Double]).setScale(6))}.toMap
})
val f = df1.withColumn("map",udfJsonStrToMapDecimal($"json"))
scala> f.printSchema
root
 |-- json: string (nullable = true)
 |-- map: map (nullable = true)
 ||-- key: string
 ||-- value: decimal(38,18) (valueContainsNull = true)
{code}
 

*instead of decimal(38,6) it converting the value as decimal(38,18)*

  was:
0
I was trying to convert json column to map. I tried udf for converting json to 
map. but it is not working as expected.
 val df1 = Seq(("\{\"k\":10.004}")).toDF("json")
def udfJsonStrToMapDecimal = udf((jsonStr: String)=> \{ var 
jsonMap:Map[String,Any] = parse(jsonStr).values.asInstanceOf[Map[String, Any]]
 jsonMap.map{case(k,v) => 
(k,BigDecimal.decimal(v.asInstanceOf[Double]).setScale(6))}.toMap
})
val f = df1.withColumn("map",udfJsonStrToMapDecimal($"json"))
scala> f.printSchema
root
 |-- json: string (nullable = true)
 |-- map: map (nullable = true)
 ||-- key: string
 ||-- value: decimal(38,18) (valueContainsNull = true){{}}
*instead of decimal(38,6) it converting the value as decimal(38,18)* 


> BigDecimal setScale is not working in Spark UDF
> ---
>
> Key: SPARK-31695
> URL: https://issues.apache.org/jira/browse/SPARK-31695
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.3.4
>Reporter: Saravanan Raju
>Priority: Major
>
> I was trying to convert json column to map. I tried udf for converting json 
> to map. but it is not working as expected.
>   
> {code:java}
> val df1 = Seq(("{\"k\":10.004}")).toDF("json")
> def udfJsonStrToMapDecimal = udf((jsonStr: String)=> { var 
> jsonMap:Map[String,Any] = parse(jsonStr).values.asInstanceOf[Map[String, Any]]
>  jsonMap.map{case(k,v) => 
> (k,BigDecimal.decimal(v.asInstanceOf[Double]).setScale(6))}.toMap
> })
> val f = df1.withColumn("map",udfJsonStrToMapDecimal($"json"))
> scala> f.printSchema
> root
>  |-- json: string (nullable = true)
>  |-- map: map (nullable = true)
>  ||-- key: string
>  ||-- value: decimal(38,18) (valueContainsNull = true)
> {code}
>  
> *instead of decimal(38,6) it converting the value as decimal(38,18)*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-31695) BigDecimal setScale is not working in Spark UDF

2020-05-12 Thread Saravanan Raju (Jira)
Saravanan Raju created SPARK-31695:
--

 Summary: BigDecimal setScale is not working in Spark UDF
 Key: SPARK-31695
 URL: https://issues.apache.org/jira/browse/SPARK-31695
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, SQL
Affects Versions: 2.3.4
Reporter: Saravanan Raju


0
I was trying to convert json column to map. I tried udf for converting json to 
map. but it is not working as expected.
 val df1 = Seq(("\{\"k\":10.004}")).toDF("json")
def udfJsonStrToMapDecimal = udf((jsonStr: String)=> \{ var 
jsonMap:Map[String,Any] = parse(jsonStr).values.asInstanceOf[Map[String, Any]]
 jsonMap.map{case(k,v) => 
(k,BigDecimal.decimal(v.asInstanceOf[Double]).setScale(6))}.toMap
})
val f = df1.withColumn("map",udfJsonStrToMapDecimal($"json"))
scala> f.printSchema
root
 |-- json: string (nullable = true)
 |-- map: map (nullable = true)
 ||-- key: string
 ||-- value: decimal(38,18) (valueContainsNull = true){{}}
*instead of decimal(38,6) it converting the value as decimal(38,18)* 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-29156) Hive has appending data as part of cdc, In write mode we should be able to write only changes captured to teradata or datasource.

2019-09-18 Thread raju (Jira)
raju created SPARK-29156:


 Summary: Hive has appending data as part of cdc, In write mode we 
should be able to write only changes captured to teradata or datasource.
 Key: SPARK-29156
 URL: https://issues.apache.org/jira/browse/SPARK-29156
 Project: Spark
  Issue Type: New Feature
  Components: Tests
Affects Versions: 2.4.3
 Environment: spark 2.3.2

dataiku

aws emr
Reporter: raju


In general change data captures are appended to hive tables. We have scenario 
where connecting to teradata/ datasource. Only changes captured as updates 
should be able to write in data source. We are unable to do same by over write 
and append modes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org