[ 
https://issues.apache.org/jira/browse/SPARK-34417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amandeep Sharma updated SPARK-34417:
------------------------------------
    Description: 
Code to reproduce the issue:
{code:java}
import org.apache.spark.sql.SparkSession

object ColumnNameWithDot {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("Simple Application")
      .config("spark.master", "local").getOrCreate()

    spark.sparkContext.setLogLevel("OFF")

    import spark.implicits._
    val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
    df.na.fill(Map("`ColWith.Dot`" -> "na"))
      .show()

  }
}
{code}
*Analysis*

*------------------------------PART-I-----------------------------------*

Debugged the spark code. It is due to a bug in the spark-catalyst code at

[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]

Function in question resolves the column per the code-comments in the following 
order until a match is found.
 * Consider pattern dbName.tableName.columnName
 * Consider tableName.columnName
 * Consider everything as columnName

But implementation considers only the first part for the resolution in the 
third step. It should join all parts using dot(.).

*------------------------------PART-II-----------------------------------*

If we don’t use column name with back-tick them it fails at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]

 If it is quoted, the condition at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413]
 becomes false as *k* has value quoted with back-tick whereas *f.name* is not. 
Then it fails at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]

It is failing due to the reason mentioned in the PART-I.

*Solution*

 

Make changes in 
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
 as below:

{color:#ff0000}-val name = nameParts.head{color}
 {color:#00875a}+ val name = nameParts.mkString(".") // join all part using 
.{color}
 val attributes = collectMatches(name, 
direct.get(name.toLowerCase(Locale.ROOT)))
 {color:#ff0000}- (attributes, nameParts.tail){color}
 {color:#00875a}+ (attributes, Seq.empty){color}

*{color:#172b4d}Workaround{color}*

{color:#172b4d}We can make change in 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color}

{color:#172b4d}While we are resolving the input columns, create a new map with 
the name of the resolved column and replace value as below.{color}

{color:#172b4d}Idea is to use resolved named instead of input name while 
filling null values.{color}
{code:java}
private def fillMap(values: Seq[(String, Any)]): DataFrame = {
  // Error handling
  var resolved: Map[String, Any] = Map()
  values.foreach { case (colName, replaceValue) =>
    // Check column name exists
    val resolvedColumn = df.resolve(colName)

    // Check data type
    replaceValue match {
      case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: 
jl.Boolean | _: String =>
      // This is good
      case _ => throw new IllegalArgumentException(
        s"Unsupported value type ${replaceValue.getClass.getName} 
($replaceValue).")
    }
    resolved += (resolvedColumn.name -> replaceValue)
  }

  val columnEquals = df.sparkSession.sessionState.analyzer.resolver
  val projections = df.schema.fields.map { f =>
    resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) 
=>
      v match {
        case v: jl.Float => fillCol[Float](f, v)
        case v: jl.Double => fillCol[Double](f, v)
        case v: jl.Long => fillCol[Long](f, v)
        case v: jl.Integer => fillCol[Integer](f, v)
        case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
        case v: String => fillCol[String](f, v)
      }
    }.getOrElse(df.col(f.name))
  }
  df.select(projections : _*)
}
{code}

  was:
Code to reproduce the issue:
{code:java}
import org.apache.spark.sql.SparkSession

object ColumnNameWithDot {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.appName("Simple Application")
      .config("spark.master", "local").getOrCreate()

    spark.sparkContext.setLogLevel("OFF")

    import spark.implicits._
    val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col")
    df.na.fill(Map("`ColWith.Dot`" -> "na"))
      .show()

  }
}
{code}
*Analysis*

*------------------------------PART-I-----------------------------------*

Debugged the spark code. It is due to a bug in the spark-catalyst code at

[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]

Function in question resolves the column per the code-comments in the following 
order until a match is found.
 * Consider pattern dbName.tableName.columnName
 * Consider tableName.columnName
 * Consider everything as columnName

But implementation considers only the first part for the resolution in the 
third step. It should join all parts using dot(.).

*------------------------------PART-II-----------------------------------*

If we don’t use column name with back-tick them it fails at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]

 If it is quoted, the condition at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413]
 becomes false as *k* has value quoted with back-tick whereas *f.name* is not. 
Then it fails at 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]

It is failing due to the reason mentioned in the PART-I.

*Solution*

 

Make changes in 
[https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
 as below:

{color:#FF0000}val name = nameParts.head{color}
{color:#00875a}+ val name = nameParts.mkString(".") // join all part using 
.{color}
 val attributes = collectMatches(name, 
direct.get(name.toLowerCase(Locale.ROOT)))
{color:#FF0000}- (attributes, nameParts.tail){color}
{color:#00875a}+ (attributes, Seq.empty){color}

*{color:#172b4d}Workaround{color}*

{color:#172b4d}We can make change in 
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]
{color}

{color:#172b4d}While we are resolving the input columns, create a new map with 
the name of the resolved column and replace value as below.{color}

{color:#172b4d}Idea is to use resolved named instead of input name while 
filling null values.{color}
{code:java}
private def fillMap(values: Seq[(String, Any)]): DataFrame = {
  // Error handling
  var resolved: Map[String, Any] = Map()
  values.foreach { case (colName, replaceValue) =>
    // Check column name exists
    val resolvedColumn = df.resolve(colName)

    // Check data type
    replaceValue match {
      case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: 
jl.Boolean | _: String =>
      // This is good
      case _ => throw new IllegalArgumentException(
        s"Unsupported value type ${replaceValue.getClass.getName} 
($replaceValue).")
    }
    resolved += (resolvedColumn.name -> replaceValue)
  }

  val columnEquals = df.sparkSession.sessionState.analyzer.resolver
  val projections = df.schema.fields.map { f =>
    resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) 
=>
      v match {
        case v: jl.Float => fillCol[Float](f, v)
        case v: jl.Double => fillCol[Double](f, v)
        case v: jl.Long => fillCol[Long](f, v)
        case v: jl.Integer => fillCol[Integer](f, v)
        case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
        case v: String => fillCol[String](f, v)
      }
    }.getOrElse(df.col(f.name))
  }
  df.select(projections : _*)
}
{code}


> org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String, Any)]) 
> fails for column name having a dot
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-34417
>                 URL: https://issues.apache.org/jira/browse/SPARK-34417
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.1
>         Environment: Spark version - 3.0.1
> OS - macOS 10.15.7
>            Reporter: Amandeep Sharma
>            Priority: Major
>
> Code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.SparkSession
> object ColumnNameWithDot {
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession.builder.appName("Simple Application")
>       .config("spark.master", "local").getOrCreate()
>     spark.sparkContext.setLogLevel("OFF")
>     import spark.implicits._
>     val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", 
> "Col")
>     df.na.fill(Map("`ColWith.Dot`" -> "na"))
>       .show()
>   }
> }
> {code}
> *Analysis*
> *------------------------------PART-I-----------------------------------*
> Debugged the spark code. It is due to a bug in the spark-catalyst code at
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
> Function in question resolves the column per the code-comments in the 
> following order until a match is found.
>  * Consider pattern dbName.tableName.columnName
>  * Consider tableName.columnName
>  * Consider everything as columnName
> But implementation considers only the first part for the resolution in the 
> third step. It should join all parts using dot(.).
> *------------------------------PART-II-----------------------------------*
> If we don’t use column name with back-tick them it fails at 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400]
>  If it is quoted, the condition at 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413]
>  becomes false as *k* has value quoted with back-tick whereas *f.name* is 
> not. Then it fails at 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422]
> It is failing due to the reason mentioned in the PART-I.
> *Solution*
>  
> Make changes in 
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268]
>  as below:
> {color:#ff0000}-val name = nameParts.head{color}
>  {color:#00875a}+ val name = nameParts.mkString(".") // join all part using 
> .{color}
>  val attributes = collectMatches(name, 
> direct.get(name.toLowerCase(Locale.ROOT)))
>  {color:#ff0000}- (attributes, nameParts.tail){color}
>  {color:#00875a}+ (attributes, Seq.empty){color}
> *{color:#172b4d}Workaround{color}*
> {color:#172b4d}We can make change in 
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color}
> {color:#172b4d}While we are resolving the input columns, create a new map 
> with the name of the resolved column and replace value as below.{color}
> {color:#172b4d}Idea is to use resolved named instead of input name while 
> filling null values.{color}
> {code:java}
> private def fillMap(values: Seq[(String, Any)]): DataFrame = {
>   // Error handling
>   var resolved: Map[String, Any] = Map()
>   values.foreach { case (colName, replaceValue) =>
>     // Check column name exists
>     val resolvedColumn = df.resolve(colName)
>     // Check data type
>     replaceValue match {
>       case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: 
> jl.Boolean | _: String =>
>       // This is good
>       case _ => throw new IllegalArgumentException(
>         s"Unsupported value type ${replaceValue.getClass.getName} 
> ($replaceValue).")
>     }
>     resolved += (resolvedColumn.name -> replaceValue)
>   }
>   val columnEquals = df.sparkSession.sessionState.analyzer.resolver
>   val projections = df.schema.fields.map { f =>
>     resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, 
> v) =>
>       v match {
>         case v: jl.Float => fillCol[Float](f, v)
>         case v: jl.Double => fillCol[Double](f, v)
>         case v: jl.Long => fillCol[Long](f, v)
>         case v: jl.Integer => fillCol[Integer](f, v)
>         case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue())
>         case v: String => fillCol[String](f, v)
>       }
>     }.getOrElse(df.col(f.name))
>   }
>   df.select(projections : _*)
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to