[ https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manish updated SPARK-26436: --------------------------- Description: There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation. In such cases groupByKey throws the following exception: java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined. Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed. Following is the code that reproduces the scenario: {code:java} {{ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import scala.collection.mutable.ListBuffer object Test { def main(args: Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = SparkSession.builder.config("spark.master", "local").getOrCreate import session.implicits._ val dataFrame = values.toDF dataFrame.show() dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ Array( StructField("Count", IntegerType, false) ) ) val expr = RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( StructField("Count1", IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) tranform2.show } }}} {code} was: There seems to be a bug on groupByKey api for cases when it (groupByKey) is applied on a DataSet resulting from a former groupByKey and flatMapGroups invocation. In such cases groupByKey throws the following exception: java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined. Although the dataframe has a valid schema and a groupBy("key") or repartition($"key") api calls on the same Dataframe and key succeed. Following is the code that reproduces the scenario: {code} {{ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import scala.collection.mutable.ListBuffer object Test { def main(args: Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = SparkSession.builder.config("spark.master", "local").getOrCreate import session.implicits._ val dataFrame = values.toDF dataFrame.show() dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ Array( StructField("Count", IntegerType, false) ) ) val expr = RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( StructField("Count1", IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) tranform2.show } }}} {code} > Dataframe as a result of GroupByKey and flatMapGroups operation throws > java.lang.UnsupportedException when groupByKey is applied on it. > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-26436 > URL: https://issues.apache.org/jira/browse/SPARK-26436 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.4.0 > Reporter: Manish > Priority: Major > > There seems to be a bug on groupByKey api for cases when it (groupByKey) is > applied on a DataSet resulting from a former groupByKey and flatMapGroups > invocation. > In such cases groupByKey throws the following exception: > java.lang.UnsupportedException: fieldIndex on a Row without schema is > undefined. > > Although the dataframe has a valid schema and a groupBy("key") or > repartition($"key") api calls on the same Dataframe and key succeed. > > Following is the code that reproduces the scenario: > > {code:java} > > {{ import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.{Row, SparkSession} import > org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import > scala.collection.mutable.ListBuffer object Test { def main(args: > Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") > ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = > SparkSession.builder.config("spark.master", "local").getOrCreate import > session.implicits._ val dataFrame = values.toDF dataFrame.show() > dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ > Array( StructField("Count", IntegerType, false) ) ) val expr = > RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => > row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = > inputItr.toSeq val length = inputSeq.size var listBuff = new > ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 > } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += > Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) > tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( > StructField("Count1", IntegerType, false) ) ) val expr1 = > RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => > row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = > inputItr.toSeq val length = inputSeq.size var listBuff = new > ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 > } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += > Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) > tranform2.show } }}} > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org