[ 
https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manish updated SPARK-26436:
---------------------------
    Description: 
There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
applied on a DataSet resulting from a former groupByKey and flatMapGroups 
invocation.

In such cases groupByKey throws the following exception:

java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

 

Although the dataframe has a valid schema and a groupBy("key") or 
repartition($"key") api calls on the same Dataframe and key succeed.

 

Following is the code that reproduces the scenario:

 
{code:java}
 

{{ import org.apache.spark.sql.catalyst.encoders.RowEncoder 
import org.apache.spark.sql.{Row, SparkSession} import 
org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import 
scala.collection.mutable.ListBuffer object Test { def main(args: 
Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") 
,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = 
SparkSession.builder.config("spark.master", "local").getOrCreate import 
session.implicits._ val dataFrame = values.toDF dataFrame.show() 
dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ 
Array( StructField("Count", IntegerType, false) ) ) val expr = 
RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => 
row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = 
inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() 
var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until 
length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ 
Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 
= StructType(tranform.schema.fields ++ Array( StructField("Count1", 
IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 
= tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, 
inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var 
listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) 
{ counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += 
Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) 
tranform2.show } }}}

{code}
 

  was:
There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
applied on a DataSet resulting from a former groupByKey and flatMapGroups 
invocation.

In such cases groupByKey throws the following exception:

java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

 

Although the dataframe has a valid schema and a groupBy("key") or 
repartition($"key") api calls on the same Dataframe and key succeed.

 

Following is the code that reproduces the scenario:

 

{code}

 

{{ import org.apache.spark.sql.catalyst.encoders.RowEncoder import 
org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{ 
IntegerType, StructField, StructType} import 
scala.collection.mutable.ListBuffer object Test { def main(args: 
Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") 
,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = 
SparkSession.builder.config("spark.master", "local").getOrCreate import 
session.implicits._ val dataFrame = values.toDF dataFrame.show() 
dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ 
Array( StructField("Count", IntegerType, false) ) ) val expr = 
RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => 
row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = 
inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() 
var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until 
length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ 
Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 
= StructType(tranform.schema.fields ++ Array( StructField("Count1", 
IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 
= tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, 
inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var 
listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) 
{ counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += 
Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) 
tranform2.show } }}}

{code}

 


> Dataframe as a result of GroupByKey and flatMapGroups operation throws 
> java.lang.UnsupportedException when groupByKey is applied on it.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26436
>                 URL: https://issues.apache.org/jira/browse/SPARK-26436
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Manish
>            Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
> applied on a DataSet resulting from a former groupByKey and flatMapGroups 
> invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is 
> undefined.
>  
> Although the dataframe has a valid schema and a groupBy("key") or 
> repartition($"key") api calls on the same Dataframe and key succeed.
>  
> Following is the code that reproduces the scenario:
>  
> {code:java}
>  
> {{ import org.apache.spark.sql.catalyst.encoders.RowEncoder 
> import org.apache.spark.sql.{Row, SparkSession} import 
> org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import 
> scala.collection.mutable.ListBuffer object Test { def main(args: 
> Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") 
> ,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = 
> SparkSession.builder.config("spark.master", "local").getOrCreate import 
> session.implicits._ val dataFrame = values.toDF dataFrame.show() 
> dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ 
> Array( StructField("Count", IntegerType, false) ) ) val expr = 
> RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = 
> inputItr.toSeq val length = inputSeq.size var listBuff = new 
> ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 
> } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += 
> Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr) 
> tranform.show val newSchema1 = StructType(tranform.schema.fields ++ Array( 
> StructField("Count1", IntegerType, false) ) ) val expr1 = 
> RowEncoder.apply(newSchema1) val tranform2 = tranform.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = 
> inputItr.toSeq val length = inputSeq.size var listBuff = new 
> ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 
> } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += 
> Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) 
> tranform2.show } }}}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to