[ 
https://issues.apache.org/jira/browse/SPARK-26436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manish updated SPARK-26436:
---------------------------
    Description: 
There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
applied on a DataSet resulting from a former groupByKey and flatMapGroups 
invocation.

In such cases groupByKey throws the following exception:

java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

 

Although the dataframe has a valid schema and a groupBy("key") or 
repartition($"key") api calls on the same Dataframe and key succeed.

 

Following is the code that reproduces the scenario:

 
{code}
    import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}

import scala.collection.mutable.ListBuffer



  object Test {

    def main(args: Array[String]): Unit = {

      val values = List(List("1", "One") ,List("1", "Two") ,List("2", 
"Three"),List("2","4")).map(x =>(x(0), x(1)))
      val session = SparkSession.builder.config("spark.master", 
"local").getOrCreate
      import session.implicits._
      val dataFrame = values.toDF


      dataFrame.show()
      dataFrame.printSchema()

      val newSchema = StructType(dataFrame.schema.fields
        ++ Array(
        StructField("Count", IntegerType, false)
      )
      )

      val expr = RowEncoder.apply(newSchema)

      val tranform =  dataFrame.groupByKey(row => 
row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr)

      tranform.show

      val newSchema1 = StructType(tranform.schema.fields
        ++ Array(
        StructField("Count1", IntegerType, false)
      )
      )
      val expr1 = RowEncoder.apply(newSchema1)
      val tranform2 =  tranform.groupByKey(row => 
row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
        val inputSeq = inputItr.toSeq

        val length = inputSeq.size
        var listBuff = new ListBuffer[Row]()
        var counter : Int= 0
        for(i <- 0 until(length))
        {
          counter+=1

        }

        for(i <- 0 until length ) {
          var x = inputSeq(i)
          listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
        }
        listBuff.iterator
      })(expr1)

      tranform2.show
    }
}

Test.main(null){code}
 

  was:
There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
applied on a DataSet resulting from a former groupByKey and flatMapGroups 
invocation.

In such cases groupByKey throws the following exception:

java.lang.UnsupportedException: fieldIndex on a Row without schema is undefined.

 

Although the dataframe has a valid schema and a groupBy("key") or 
repartition($"key") api calls on the same Dataframe and key succeed.

 

Following is the code that reproduces the scenario:

 
{code:java}
 

{{ import org.apache.spark.sql.catalyst.encoders.RowEncoder 
import org.apache.spark.sql.{Row, SparkSession} import 
org.apache.spark.sql.types.{ IntegerType, StructField, StructType} import 
scala.collection.mutable.ListBuffer object Test { def main(args: 
Array[String]): Unit = { val values = List(List("1", "One") ,List("1", "Two") 
,List("2", "Three"),List("2","4")).map(x =>(x(0), x(1))) val session = 
SparkSession.builder.config("spark.master", "local").getOrCreate import 
session.implicits._ val dataFrame = values.toDF dataFrame.show() 
dataFrame.printSchema() val newSchema = StructType(dataFrame.schema.fields ++ 
Array( StructField("Count", IntegerType, false) ) ) val expr = 
RowEncoder.apply(newSchema) val tranform = dataFrame.groupByKey(row => 
row.getAs[String]("_1")).flatMapGroups((key, inputItr) => { val inputSeq = 
inputItr.toSeq val length = inputSeq.size var listBuff = new ListBuffer[Row]() 
var counter : Int= 0 for(i <- 0 until(length)) { counter+=1 } for(i <- 0 until 
length ) { var x = inputSeq(i) listBuff += Row.fromSeq(x.toSeq ++ 
Array[Int](counter)) } listBuff.iterator })(expr) tranform.show val newSchema1 
= StructType(tranform.schema.fields ++ Array( StructField("Count1", 
IntegerType, false) ) ) val expr1 = RowEncoder.apply(newSchema1) val tranform2 
= tranform.groupByKey(row => row.getAs[String]("_1")).flatMapGroups((key, 
inputItr) => { val inputSeq = inputItr.toSeq val length = inputSeq.size var 
listBuff = new ListBuffer[Row]() var counter : Int= 0 for(i <- 0 until(length)) 
{ counter+=1 } for(i <- 0 until length ) { var x = inputSeq(i) listBuff += 
Row.fromSeq(x.toSeq ++ Array[Int](counter)) } listBuff.iterator })(expr1) 
tranform2.show } }}}

{code}
 


> Dataframe as a result of GroupByKey and flatMapGroups operation throws 
> java.lang.UnsupportedException when groupByKey is applied on it.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26436
>                 URL: https://issues.apache.org/jira/browse/SPARK-26436
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>            Reporter: Manish
>            Priority: Major
>
> There seems to be a bug on groupByKey api for cases when it (groupByKey) is 
> applied on a DataSet resulting from a former groupByKey and flatMapGroups 
> invocation.
> In such cases groupByKey throws the following exception:
> java.lang.UnsupportedException: fieldIndex on a Row without schema is 
> undefined.
>  
> Although the dataframe has a valid schema and a groupBy("key") or 
> repartition($"key") api calls on the same Dataframe and key succeed.
>  
> Following is the code that reproduces the scenario:
>  
> {code}
>     import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.{Row, SparkSession}
> import org.apache.spark.sql.types.{ IntegerType, StructField, StructType}
> import scala.collection.mutable.ListBuffer
>   object Test {
>     def main(args: Array[String]): Unit = {
>       val values = List(List("1", "One") ,List("1", "Two") ,List("2", 
> "Three"),List("2","4")).map(x =>(x(0), x(1)))
>       val session = SparkSession.builder.config("spark.master", 
> "local").getOrCreate
>       import session.implicits._
>       val dataFrame = values.toDF
>       dataFrame.show()
>       dataFrame.printSchema()
>       val newSchema = StructType(dataFrame.schema.fields
>         ++ Array(
>         StructField("Count", IntegerType, false)
>       )
>       )
>       val expr = RowEncoder.apply(newSchema)
>       val tranform =  dataFrame.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr)
>       tranform.show
>       val newSchema1 = StructType(tranform.schema.fields
>         ++ Array(
>         StructField("Count1", IntegerType, false)
>       )
>       )
>       val expr1 = RowEncoder.apply(newSchema1)
>       val tranform2 =  tranform.groupByKey(row => 
> row.getAs[String]("_1")).flatMapGroups((key, inputItr) => {
>         val inputSeq = inputItr.toSeq
>         val length = inputSeq.size
>         var listBuff = new ListBuffer[Row]()
>         var counter : Int= 0
>         for(i <- 0 until(length))
>         {
>           counter+=1
>         }
>         for(i <- 0 until length ) {
>           var x = inputSeq(i)
>           listBuff += Row.fromSeq(x.toSeq ++ Array[Int](counter))
>         }
>         listBuff.iterator
>       })(expr1)
>       tranform2.show
>     }
> }
> Test.main(null){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to