[ 
https://issues.apache.org/jira/browse/SPARK-36932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427005#comment-17427005
 ] 

chendihao commented on SPARK-36932:
-----------------------------------

It is not related to join operation. I have a simpler case to re-produce the 
issue with single dataframe.

{code:scala}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
import org.apache.spark.sql.{Row, SparkSession}

object SchemaPuringIssue {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .getOrCreate()

    val data1 = Seq(
      Row(1, 1l),
      Row(2, 2l))
    val schema1 = StructType(List(
      StructField("col1", IntegerType),
      StructField("col1", LongType)))
    val df = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
    df.show()

    import spark.implicits._

    val distinct = df
      .groupByKey {
        row => row.getInt(0)
      }
      .mapGroups {
        case (_, iter) =>
          iter.maxBy(row => {
            row.getInt(0)
          })
      }(RowEncoder(df.schema))

    distinct.show()
  }

}
{code}

It seems that the Catalog Optimizer uses SchemaPruning and it raises exception 
for the schema with the same name but different types.

> Misuse "merge schema" when mapGroups
> ------------------------------------
>
>                 Key: SPARK-36932
>                 URL: https://issues.apache.org/jira/browse/SPARK-36932
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 3.0.0
>            Reporter: Wang Zekai
>            Priority: Major
>
> {code:java}
> // Test case for this bug
> val spark = SparkSession.builder().master("local[*]").getOrCreate()
> val data1 = Seq(
>   Row("0", 1),
>   Row("0", 2))
> val schema1 = StructType(List(
>   StructField("col0", StringType),
>   StructField("col1", IntegerType))
> )
> val data2 = Seq(
>   Row("0", 1),
>   Row("0", 2))
> val schema2 = StructType(List(
>   StructField("str0", StringType),
>   StructField("col0", IntegerType))
> )
> val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1)
> val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2)
> val joined = df1.join(df2, df1("col0") === df2("str0"), "left")
> import spark.implicits._
> val distinct = joined
>   .groupByKey {
>     row => row.getInt(1)
>   }
>   .mapGroups {
>     case (_, iter) =>
>         iter.maxBy(row => {
>           row.getInt(3)
>         })
>   }(RowEncoder(joined.schema))
> distinct.show(){code}
> {code:java}
>  // A part of errors
> Exception in thread "main" org.apache.spark.SparkException: Failed to merge 
> fields 'col0' and 'col0'. Failed to merge incompatible data types string and 
> int 
> at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593)
> at scala.Option.map(Option.scala:163)
> at 
> org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585)
> at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted
> (StructType.scala:582)
> at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) 
> at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582) 
> at org.apache.spark.sql.types.StructType.merge(StructType.scala:492)
> at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$
> anonfun$pruneDataSchema$2(SchemaPruning.scala:36)
> at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
> at 
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
> at scala.collection.immutable.List.foldLeft(List.scala:89) 
> at 
> scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140) 
> at 
> scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138) 
> at scala.collection.immutable.List.reduceLeft(List.scala:89)
> {code}
> After left join two dataframe which have two shemas with the same name but 
> different types, we use groupByKey and mapGroups to get the result. But it 
> will makes some mistakes. Is it my grammatical mistake? If not, I think It 
> may be related to schema merge in StructType.scala: 593. How can I turn off 
> schema merging?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to