[ https://issues.apache.org/jira/browse/SPARK-36932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17427005#comment-17427005 ]
chendihao commented on SPARK-36932: ----------------------------------- It is not related to join operation. I have a simpler case to re-produce the issue with single dataframe. {code:scala} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object SchemaPuringIssue { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .getOrCreate() val data1 = Seq( Row(1, 1l), Row(2, 2l)) val schema1 = StructType(List( StructField("col1", IntegerType), StructField("col1", LongType))) val df = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1) df.show() import spark.implicits._ val distinct = df .groupByKey { row => row.getInt(0) } .mapGroups { case (_, iter) => iter.maxBy(row => { row.getInt(0) }) }(RowEncoder(df.schema)) distinct.show() } } {code} It seems that the Catalog Optimizer uses SchemaPruning and it raises exception for the schema with the same name but different types. > Misuse "merge schema" when mapGroups > ------------------------------------ > > Key: SPARK-36932 > URL: https://issues.apache.org/jira/browse/SPARK-36932 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 3.0.0 > Reporter: Wang Zekai > Priority: Major > > {code:java} > // Test case for this bug > val spark = SparkSession.builder().master("local[*]").getOrCreate() > val data1 = Seq( > Row("0", 1), > Row("0", 2)) > val schema1 = StructType(List( > StructField("col0", StringType), > StructField("col1", IntegerType)) > ) > val data2 = Seq( > Row("0", 1), > Row("0", 2)) > val schema2 = StructType(List( > StructField("str0", StringType), > StructField("col0", IntegerType)) > ) > val df1 = spark.createDataFrame(spark.sparkContext.makeRDD(data1), schema1) > val df2 = spark.createDataFrame(spark.sparkContext.makeRDD(data2), schema2) > val joined = df1.join(df2, df1("col0") === df2("str0"), "left") > import spark.implicits._ > val distinct = joined > .groupByKey { > row => row.getInt(1) > } > .mapGroups { > case (_, iter) => > iter.maxBy(row => { > row.getInt(3) > }) > }(RowEncoder(joined.schema)) > distinct.show(){code} > {code:java} > // A part of errors > Exception in thread "main" org.apache.spark.SparkException: Failed to merge > fields 'col0' and 'col0'. Failed to merge incompatible data types string and > int > at > org.apache.spark.sql.types.StructType$.$anonfun$merge$2(StructType.scala:593) > at scala.Option.map(Option.scala:163) > at > org.apache.spark.sql.types.StructType$.$anonfun$merge$1(StructType.scala:585) > at org.apache.spark.sql.types.StructType$.$anonfun$merge$1$adapted > (StructType.scala:582) > at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at org.apache.spark.sql.types.StructType$.merge(StructType.scala:582) > at org.apache.spark.sql.types.StructType.merge(StructType.scala:492) > at org.apache.spark.sql.catalyst.expressions.SchemaPruning$.$ > anonfun$pruneDataSchema$2(SchemaPruning.scala:36) > at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) > at > scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) > at scala.collection.immutable.List.foldLeft(List.scala:89) > at > scala.collection.LinearSeqOptimized.reduceLeft(LinearSeqOptimized.scala:140) > at > scala.collection.LinearSeqOptimized.reduceLeft$(LinearSeqOptimized.scala:138) > at scala.collection.immutable.List.reduceLeft(List.scala:89) > {code} > After left join two dataframe which have two shemas with the same name but > different types, we use groupByKey and mapGroups to get the result. But it > will makes some mistakes. Is it my grammatical mistake? If not, I think It > may be related to schema merge in StructType.scala: 593. How can I turn off > schema merging? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org