[ 
https://issues.apache.org/jira/browse/SPARK-46176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalii Tymchyshyn updated SPARK-46176:
---------------------------------------
    Description: 
[https://github.com/apache/spark/blob/9987cab84bb05a61a5c0b43f94a733561a2e074a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L4459]
 treats Dataset[X] as Dataset[Row] if X can take Row. Unfortunately Object can 
take row, so a try to union two Dataset[Object] produces pretty strange result.

The workaround is to wrap Object into something that can't take Row (e.g. Array)

Reproduction:
{code:java}
val sql = new org.apache.spark.sql.SQLContext(sc)
implicit val encoder = org.apache.spark.sql.Encoders.kryo[Object]
val ds = sql.createDataset(Seq("2".asInstanceOf[Object]))
val unionDs = ds.union(ds)
unionDs.takeAsList(10) {code}
Produces in notebook
{code:java}
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@76e23eff
encoder: org.apache.spark.sql.Encoder[Object] = class[value[0]: binary]
ds: org.apache.spark.sql.Dataset[Object] = [value: binary]
unionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
res20: java.util.List[Object] = [[[B@387e604f], [[B@601d8331]] {code}
Workaround:
{code:java}
implicit val arrayEncoder = org.apache.spark.sql.Encoders.kryo[Array[Object]]
val workaround = ds.map(x => Array(x))
val workaroundUnionDs = workaround.union(workaround).map(x => x(0))
workaroundUnionDs.takeAsList(10) {code}
Produces expected result:
{code:java}
arrayEncoder: org.apache.spark.sql.Encoder[Array[Object]] = class[value[0]: 
binary]
workaround: org.apache.spark.sql.Dataset[Array[Object]] = [value: binary]
workaroundUnionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
res19: java.util.List[Object] = [2, 2] {code}

  was:
[https://github.com/apache/spark/blob/9987cab84bb05a61a5c0b43f94a733561a2e074a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L4459]
 treats Dataset[X] as Dataset[Row] if X can take Row. Unfortunatelly Object can 
take row, so a try to union two Dataset[Row] produces pretty strange result.

The workaround is to wrap Object into something that can't take Row (e.g. Array)

Reproduction:
{code:java}
val sql = new org.apache.spark.sql.SQLContext(sc)
implicit val encoder = org.apache.spark.sql.Encoders.kryo[Object]
val ds = sql.createDataset(Seq("2".asInstanceOf[Object]))
val unionDs = ds.union(ds)
unionDs.takeAsList(10) {code}
Produces in notebook
{code:java}
sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@76e23eff
encoder: org.apache.spark.sql.Encoder[Object] = class[value[0]: binary]
ds: org.apache.spark.sql.Dataset[Object] = [value: binary]
unionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
res20: java.util.List[Object] = [[[B@387e604f], [[B@601d8331]] {code}
Workaround:
{code:java}
implicit val arrayEncoder = org.apache.spark.sql.Encoders.kryo[Array[Object]]
val workaround = ds.map(x => Array(x))
val workaroundUnionDs = workaround.union(workaround).map(x => x(0))
workaroundUnionDs.takeAsList(10) {code}
Produces expected result:
{code:java}
arrayEncoder: org.apache.spark.sql.Encoder[Array[Object]] = class[value[0]: 
binary]
workaround: org.apache.spark.sql.Dataset[Array[Object]] = [value: binary]
workaroundUnionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
res19: java.util.List[Object] = [2, 2] {code}


> Union of two kryo-encoded Dataset[Object] produces rows of bytes
> ----------------------------------------------------------------
>
>                 Key: SPARK-46176
>                 URL: https://issues.apache.org/jira/browse/SPARK-46176
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.0
>            Reporter: Vitalii Tymchyshyn
>            Priority: Major
>
> [https://github.com/apache/spark/blob/9987cab84bb05a61a5c0b43f94a733561a2e074a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L4459]
>  treats Dataset[X] as Dataset[Row] if X can take Row. Unfortunately Object 
> can take row, so a try to union two Dataset[Object] produces pretty strange 
> result.
> The workaround is to wrap Object into something that can't take Row (e.g. 
> Array)
> Reproduction:
> {code:java}
> val sql = new org.apache.spark.sql.SQLContext(sc)
> implicit val encoder = org.apache.spark.sql.Encoders.kryo[Object]
> val ds = sql.createDataset(Seq("2".asInstanceOf[Object]))
> val unionDs = ds.union(ds)
> unionDs.takeAsList(10) {code}
> Produces in notebook
> {code:java}
> sql: org.apache.spark.sql.SQLContext = 
> org.apache.spark.sql.SQLContext@76e23eff
> encoder: org.apache.spark.sql.Encoder[Object] = class[value[0]: binary]
> ds: org.apache.spark.sql.Dataset[Object] = [value: binary]
> unionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
> res20: java.util.List[Object] = [[[B@387e604f], [[B@601d8331]] {code}
> Workaround:
> {code:java}
> implicit val arrayEncoder = org.apache.spark.sql.Encoders.kryo[Array[Object]]
> val workaround = ds.map(x => Array(x))
> val workaroundUnionDs = workaround.union(workaround).map(x => x(0))
> workaroundUnionDs.takeAsList(10) {code}
> Produces expected result:
> {code:java}
> arrayEncoder: org.apache.spark.sql.Encoder[Array[Object]] = class[value[0]: 
> binary]
> workaround: org.apache.spark.sql.Dataset[Array[Object]] = [value: binary]
> workaroundUnionDs: org.apache.spark.sql.Dataset[Object] = [value: binary]
> res19: java.util.List[Object] = [2, 2] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to