[ 
https://issues.apache.org/jira/browse/SPARK-47193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879050#comment-17879050
 ] 

Bruce Robbins edited comment on SPARK-47193 at 9/13/24 5:13 PM:
----------------------------------------------------------------

[~dongjoon] 

I started to work on it, but it's not done.

By the way, as far as I know, this is not a regression. I am pretty sure it's 
always been this way.

If someone else wants to work on it to get a fix into 3.5.3, that's fine with 
me. Otherwise, I will keep banging at it.

Here's what I've done so far:

I came up with an approach, which is mostly coded but still needs some 
finishing touches, to ensure that the SQL config is propagated to the executor. 
This approach also handles the case where the config changes after the RDD was 
obtained from Dataset#rdd. However, it includes changes to core, which I would 
prefer not to touch:

[bigger 
change|https://github.com/apache/spark/compare/master...bersprockets:spark:action_wrapper?expand=1]

Then I came up with a smaller solution, which also ensures that the SQL config 
is propagated to the executor, but it doesn't handle the case where the config 
changes after the RDD was obtained from Dataset#rdd (i.e., it uses a snapshot 
of the config at the time Dataset#rdd is called).

[smaller 
change|https://github.com/apache/spark/compare/master...bersprockets:spark:iterator_wrapper?expand=1]

Edit: Also, as it turns out, neither of my solutions seem to solve the 
reporter's specific example, only my "distilled" example. I need to track down 
why that is.



was (Author: bersprockets):
[~dongjoon] 

I started to work on it, but it's not done.

By the way, as far as I know, this is not a regression. I am pretty sure it's 
always been this way.

If someone else wants to work on it to get a fix into 3.5.3, that's fine with 
me. Otherwise, I will keep banging at it.

Here's what I've done so far:

I came up with an approach, which is mostly coded but still needs some 
finishing touches, to ensure that the SQL config is propagated to the executor. 
This approach also handles the case where the config changes after the RDD was 
obtained from Dataset#rdd. However, it includes changes to core, which I would 
prefer not to touch:

[bigger 
change|https://github.com/apache/spark/compare/master...bersprockets:spark:action_wrapper?expand=1]

Then I came up with a smaller solution, which also ensures that the SQL config 
is propagated to the executor, but it doesn't handle the case where the config 
changes after the RDD was obtained from Dataset#rdd (i.e., it uses a snapshot 
of the config at the time Dataset#rdd is called).

[smaller 
change|https://github.com/apache/spark/compare/master...bersprockets:spark:iterator_wrapper?expand=1]


> Converting dataframe to rdd results in data loss
> ------------------------------------------------
>
>                 Key: SPARK-47193
>                 URL: https://issues.apache.org/jira/browse/SPARK-47193
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0, 3.5.1
>            Reporter: Ivan Bova
>            Priority: Critical
>              Labels: correctness
>         Attachments: device.csv, deviceClass.csv, deviceType.csv, 
> language.csv, location.csv, location1.csv, timeZoneLookup.csv, user.csv, 
> userLocation.csv, userProfile.csv
>
>
> I have 10 csv files and need to create mapping from them. After all of the 
> joins dataframe contains all expected rows but rdd from this dataframe 
> contains only half of them.
> {code:java}
> case class MyUserProfileMessage(UserId: Int, Email: String, FirstName: 
> String, LastName: String, LanguageId: Option[Int])
> case class MyLanguageMessage(LanguageId: Int, LanguageLocaleId: String)
> case class MyDeviceMessage(DeviceId1: String, Created: Option[Timestamp], 
> UpdatedDate: Timestamp, DeviceId2: String, DeviceName: String, LocationId: 
> Option[Int], DeviceTypeId: Option[Int], DeviceClassId: Int, UserId1: 
> Option[Int])
> case class MyDeviceClassMessage(DeviceClassId: Int, DeviceClassName: String)
> case class MyDeviceTypeMessage(DeviceTypeId: Int, DeviceTypeName: String)
> case class MyLocation1(LocationId1: Int, LocationId: Int, Latitude: 
> Option[Double], Longitude: Option[Double], Radius: Option[Double], 
> CreatedDate: Timestamp)
> case class MyTimeZoneLookupMessage(TimeZoneId: Int, ZoneName: String)
> case class MyUserLocationMessage(UserId: Int, LocationId: Int, LocationName: 
> String, Status: Int, CreatedDate: Timestamp)
> case class MyUserMessage(UserId: Int, Created: Option[Timestamp], Deleted: 
> Option[Timestamp], Active: Option[Boolean], ActivatedDate: Option[Timestamp])
> case class MyLocationMessage(LocationId: Int, IsDeleted: Option[Boolean], 
> Address1: String, Address2: String, City: String, State: String, Country: 
> String, ZipCode: String, Feature2Enabled: Option[Boolean], LocationStatus: 
> Option[Int], Location1Enabled: Option[Boolean], LocationKey: String, 
> UpdatedDateTime: Timestamp, CreatedDate: Timestamp, Feature1Enabled: 
> Option[Boolean], Level: Option[Int], TimeZone: Option[Int])
> val userProfile = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyUserProfileMessage].schema).csv("userProfile.csv").as[MyUserProfileMessage]
> val language = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyLanguageMessage].schema).csv("language.csv").as[MyLanguageMessage]
> val device = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyDeviceMessage].schema).csv("device.csv").as[MyDeviceMessage]
> val deviceClass = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyDeviceClassMessage].schema).csv("deviceClass.csv").as[MyDeviceClassMessage]
> val deviceType = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyDeviceTypeMessage].schema).csv("deviceType.csv").as[MyDeviceTypeMessage]
> val location1 = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyLocation1].schema).csv("location1.csv").as[MyLocation1]
> val timeZoneLookup = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyTimeZoneLookupMessage].schema).csv("timeZoneLookup.csv").as[MyTimeZoneLookupMessage]
> val userLocation = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyUserLocationMessage].schema).csv("userLocation.csv").as[MyUserLocationMessage]
> val user = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyUserMessage].schema).csv("user.csv").as[MyUserMessage]
> val location = spark.read.option("header", "true").option("comment", 
> "#").option("nullValue", 
> "null").schema(Encoders.product[MyLocationMessage].schema).csv("location.csv").as[MyLocationMessage]
> val result = user
>   .join(userProfile, user("UserId") === userProfile("UserId"), "inner")
>   .join(language, userProfile("LanguageId") === language("LanguageId"), 
> "left")
>   .join(userLocation, user("UserId") === userLocation("UserId"), "inner")
>   .join(location, userLocation("LocationId") === location("LocationId"), 
> "inner")
>   .join(device, location("LocationId") === device("LocationId"), "inner")
>   .join(deviceType, device("DeviceTypeId") === deviceType("DeviceTypeId"), 
> "inner")
>   .join(
>     deviceClass,
>     device("DeviceClassId") === deviceClass("DeviceClassId"),
>     "inner")
>   .join(
>     timeZoneLookup,
>     timeZoneLookup("TimeZoneId") === location("TimeZone"),
>     "left")
>   .join(location1, location("LocationId") === location1("LocationId"), "left")
>   .where(
>     device("UserId1").isNull
>       && (user("Active") === lit(true) || user("ActivatedDate").isNotNull)
>   )
>   .dropDuplicates()
> println("df count = " + result.count())
> println("rdd count = "+ result.rdd.count())
> result.show(false)
> println("------")
> result.rdd.foreach(println) {code}
> output:
> {code:java}
> df count = 8
> rdd count = 4
> +------+-------------------+-------------------+------+-------------------+------+------+----------+---------+----------+----------+----------------+------+----------+------------+------+-----------+----------+---------+----------+----------+-----+------+--------+-------+---------------+--------------+----------------+-----------+-------------------+-------------------+---------------+-----+--------+---------+-------------------+-------------------+----------+-----------+----------+------------+-------------+-------+------------+--------------+-------------+---------------+----------+--------+-----------+----------+--------+---------+------+-------------------+
> |UserId|Created            |Deleted            |Active|ActivatedDate      
> |UserId|Email |FirstName |LastName 
> |LanguageId|LanguageId|LanguageLocaleId|UserId|LocationId|LocationName|Status|CreatedDate|LocationId|IsDeleted|Address1
>   |Address2  |City |State |Country 
> |ZipCode|Feature2Enabled|LocationStatus|Location1Enabled|LocationKey|UpdatedDateTime
>     |CreatedDate        |Feature1Enabled|Level|TimeZone|DeviceId1|Created     
>        |UpdatedDate        |DeviceId2 |DeviceName 
> |LocationId|DeviceTypeId|DeviceClassId|UserId1|DeviceTypeId|DeviceTypeName|DeviceClassId|DeviceClassName|TimeZoneId|ZoneName|LocationId1|LocationId|Latitude|Longitude|Radius|CreatedDate
>         |
> +------+-------------------+-------------------+------+-------------------+------+------+----------+---------+----------+----------+----------------+------+----------+------------+------+-----------+----------+---------+----------+----------+-----+------+--------+-------+---------------+--------------+----------------+-----------+-------------------+-------------------+---------------+-----+--------+---------+-------------------+-------------------+----------+-----------+----------+------------+-------------+-------+------------+--------------+-------------+---------------+----------+--------+-----------+----------+--------+---------+------+-------------------+
> |1     |2021-11-22 11:27:27|2021-11-25 11:27:27|false |2021-11-22 11:27:27|1  
>    |email1|firstName1|lastName1|1         |1         |It              |1     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device3  |2021-11-18 
> 11:27:27|2021-11-19 11:27:27|DeviceId23|DeviceName3|1         |3           |3 
>            |NULL   |3           |type3         |3            |class3         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> |1     |2021-11-22 11:27:27|2021-11-25 11:27:27|false |2021-11-22 11:27:27|1  
>    |email1|firstName1|lastName1|1         |1         |It              |1     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device1  |2021-11-16 
> 11:27:27|2021-11-17 11:27:27|DeviceId21|DeviceName1|1         |1           |1 
>            |NULL   |1           |type1         |1            |class1         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> |2     |2021-11-22 11:27:27|NULL               |true  |2021-11-22 11:27:27|2  
>    |email2|firstName2|lastName2|2         |2         |En              |2     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device3  |2021-11-18 
> 11:27:27|2021-11-19 11:27:27|DeviceId23|DeviceName3|1         |3           |3 
>            |NULL   |3           |type3         |3            |class3         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> |2     |2021-11-22 11:27:27|NULL               |true  |2021-11-22 11:27:27|2  
>    |email2|firstName2|lastName2|2         |2         |En              |2     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device1  |2021-11-16 
> 11:27:27|2021-11-17 11:27:27|DeviceId21|DeviceName1|1         |1           |1 
>            |NULL   |1           |type1         |1            |class1         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> |3     |2021-11-22 11:27:27|NULL               |true  |2021-11-22 11:27:27|3  
>    |email3|firstName3|lastName3|3         |3         |DE              |3     
> |2         |Location2   |NULL  |NULL       |2         |false    
> |address1_2|address2_2|City2|State2|Country2|code2  |true           |2        
>      |true            |LocKey2    |2021-11-17 11:27:27|2021-11-17 
> 11:27:27|false          |1    |1       |device4  |2021-11-25 11:27:27|NULL    
>            |DeviceId24|DeviceName4|2         |1           |2            |NULL 
>   |1           |type1         |2            |class2         |1         |Zone1 
>   |3          |2         |14.32   |45.23    |16.2  |2021-11-23 11:27:27|
> |3     |2021-11-22 11:27:27|NULL               |true  |2021-11-22 11:27:27|3  
>    |email3|firstName3|lastName3|3         |3         |DE              |3     
> |2         |Location2   |NULL  |NULL       |2         |false    
> |address1_2|address2_2|City2|State2|Country2|code2  |true           |2        
>      |true            |LocKey2    |2021-11-17 11:27:27|2021-11-17 
> 11:27:27|false          |1    |1       |device2  |2021-11-17 
> 11:27:27|2021-11-18 11:27:27|DeviceId22|DeviceName2|2         |2           |2 
>            |NULL   |2           |type2         |2            |class2         
> |1         |Zone1   |3          |2         |14.32   |45.23    |16.2  
> |2021-11-23 11:27:27|
> |4     |2021-11-22 11:27:27|NULL               |NULL  |2021-11-22 11:27:27|4  
>    |email4|firstName4|lastName4|NULL      |NULL      |NULL            |4     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device3  |2021-11-18 
> 11:27:27|2021-11-19 11:27:27|DeviceId23|DeviceName3|1         |3           |3 
>            |NULL   |3           |type3         |3            |class3         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> |4     |2021-11-22 11:27:27|NULL               |NULL  |2021-11-22 11:27:27|4  
>    |email4|firstName4|lastName4|NULL      |NULL      |NULL            |4     
> |1         |Location1   |NULL  |NULL       |1         |false    
> |address1_1|address2_1|City1|State1|Country1|code1  |true           |1        
>      |true            |LocKey1    |2021-11-16 11:27:27|2021-11-16 
> 11:27:27|false          |1    |1       |device1  |2021-11-16 
> 11:27:27|2021-11-17 11:27:27|DeviceId21|DeviceName1|1         |1           |1 
>            |NULL   |1           |type1         |1            |class1         
> |1         |Zone1   |1          |1         |12.32   |43.23    |14.2  
> |2021-11-21 11:27:27|
> +------+-------------------+-------------------+------+-------------------+------+------+----------+---------+----------+----------+----------------+------+----------+------------+------+-----------+----------+---------+----------+----------+-----+------+--------+-------+---------------+--------------+----------------+-----------+-------------------+-------------------+---------------+-----+--------+---------+-------------------+-------------------+----------+-----------+----------+------------+-------------+-------+------------+--------------+-------------+---------------+----------+--------+-----------+----------+--------+---------+------+-------------------+------
> [2,null,null,true,null,2,email2,firstName2,lastName2,2,2,En,2,1,Location1,null,null,1,false,address1_1,address2_1,City1,State1,Country1,code1,true,1,true,LocKey1,null,null,false,1,1,device3,null,null,DeviceId23,DeviceName3,1,3,3,null,3,type3,3,class3,1,Zone1,1,1,12.32,43.23,14.2,null]
> [2,null,null,true,null,2,email2,firstName2,lastName2,2,2,En,2,1,Location1,null,null,1,false,address1_1,address2_1,City1,State1,Country1,code1,true,1,true,LocKey1,null,null,false,1,1,device1,null,null,DeviceId21,DeviceName1,1,1,1,null,1,type1,1,class1,1,Zone1,1,1,12.32,43.23,14.2,null]
> [3,null,null,true,null,3,email3,firstName3,lastName3,3,3,DE,3,2,Location2,null,null,2,false,address1_2,address2_2,City2,State2,Country2,code2,true,2,true,LocKey2,null,null,false,1,1,device4,null,null,DeviceId24,DeviceName4,2,1,2,null,1,type1,2,class2,1,Zone1,3,2,14.32,45.23,16.2,null]
> [3,null,null,true,null,3,email3,firstName3,lastName3,3,3,DE,3,2,Location2,null,null,2,false,address1_2,address2_2,City2,State2,Country2,code2,true,2,true,LocKey2,null,null,false,1,1,device2,null,null,DeviceId22,DeviceName2,2,2,2,null,2,type2,2,class2,1,Zone1,3,2,14.32,45.23,16.2,null]
>  {code}
> Dataframe count and show work as expected, dut rdd does not fave first two 
> and last two records



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to