[ https://issues.apache.org/jira/browse/SPARK-34417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Amandeep Sharma updated SPARK-34417: ------------------------------------ Description: Code to reproduce the issue: {code:java} import org.apache.spark.sql.SparkSession object ColumnNameWithDot { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("Simple Application") .config("spark.master", "local").getOrCreate() spark.sparkContext.setLogLevel("OFF") import spark.implicits._ val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col") df.na.fill(Map("`ColWith.Dot`" -> "na")) .show() } } {code} *Analysis* *------------------------------PART-I-----------------------------------* Debugged the spark code. It is due to a bug in the spark-catalyst code at [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] Function in question resolves the column as per the code-comments in the following order until a match is found. * Consider pattern dbName.tableName.columnName * Consider tableName.columnName * Consider everything as columnName But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.). *------------------------------PART-II-----------------------------------* If we don’t use column name with back-tick them it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400] If it is quoted, the condition at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] becomes false as *k* has value quoted with back-tick whereas *f.name* is not. Then it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422] It is failing due to the reason mentioned in the PART-I. *Solution* Make changes in [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] as below: {color:#ff0000}-val name = nameParts.head{color} {color:#00875a}+ val name = nameParts.mkString(".") // join all part using .{color} val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT))) {color:#ff0000}- (attributes, nameParts.tail){color} {color:#00875a}+ (attributes, Seq.empty){color} *{color:#172b4d}Workaround{color}* {color:#172b4d}We can make change in [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color} {color:#172b4d}While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.{color} {color:#172b4d}Idea is to use resolved named instead of input name while filling null values.{color} {code:java} private def fillMap(values: Seq[(String, Any)]): DataFrame = { // Error handling var resolved: Map[String, Any] = Map() values.foreach { case (colName, replaceValue) => // Check column name exists val resolvedColumn = df.resolve(colName) // Check data type replaceValue match { case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String => // This is good case _ => throw new IllegalArgumentException( s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).") } resolved += (resolvedColumn.name -> replaceValue) } val columnEquals = df.sparkSession.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) => v match { case v: jl.Float => fillCol[Float](f, v) case v: jl.Double => fillCol[Double](f, v) case v: jl.Long => fillCol[Long](f, v) case v: jl.Integer => fillCol[Integer](f, v) case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue()) case v: String => fillCol[String](f, v) } }.getOrElse(df.col(f.name)) } df.select(projections : _*) } {code} was: Code to reproduce the issue: {code:java} import org.apache.spark.sql.SparkSession object ColumnNameWithDot { def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("Simple Application") .config("spark.master", "local").getOrCreate() spark.sparkContext.setLogLevel("OFF") import spark.implicits._ val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", "Col") df.na.fill(Map("`ColWith.Dot`" -> "na")) .show() } } {code} *Analysis* *------------------------------PART-I-----------------------------------* Debugged the spark code. It is due to a bug in the spark-catalyst code at [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] Function in question resolves the column per the code-comments in the following order until a match is found. * Consider pattern dbName.tableName.columnName * Consider tableName.columnName * Consider everything as columnName But implementation considers only the first part for the resolution in the third step. It should join all parts using dot(.). *------------------------------PART-II-----------------------------------* If we don’t use column name with back-tick them it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400] If it is quoted, the condition at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] becomes false as *k* has value quoted with back-tick whereas *f.name* is not. Then it fails at [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422] It is failing due to the reason mentioned in the PART-I. *Solution* Make changes in [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] as below: {color:#ff0000}-val name = nameParts.head{color} {color:#00875a}+ val name = nameParts.mkString(".") // join all part using .{color} val attributes = collectMatches(name, direct.get(name.toLowerCase(Locale.ROOT))) {color:#ff0000}- (attributes, nameParts.tail){color} {color:#00875a}+ (attributes, Seq.empty){color} *{color:#172b4d}Workaround{color}* {color:#172b4d}We can make change in [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color} {color:#172b4d}While we are resolving the input columns, create a new map with the name of the resolved column and replace value as below.{color} {color:#172b4d}Idea is to use resolved named instead of input name while filling null values.{color} {code:java} private def fillMap(values: Seq[(String, Any)]): DataFrame = { // Error handling var resolved: Map[String, Any] = Map() values.foreach { case (colName, replaceValue) => // Check column name exists val resolvedColumn = df.resolve(colName) // Check data type replaceValue match { case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String => // This is good case _ => throw new IllegalArgumentException( s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).") } resolved += (resolvedColumn.name -> replaceValue) } val columnEquals = df.sparkSession.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) => v match { case v: jl.Float => fillCol[Float](f, v) case v: jl.Double => fillCol[Double](f, v) case v: jl.Long => fillCol[Long](f, v) case v: jl.Integer => fillCol[Integer](f, v) case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue()) case v: String => fillCol[String](f, v) } }.getOrElse(df.col(f.name)) } df.select(projections : _*) } {code} > org.apache.spark.sql.DataFrameNaFunctions.fillMap(values: Seq[(String, Any)]) > fails for column name having a dot > ---------------------------------------------------------------------------------------------------------------- > > Key: SPARK-34417 > URL: https://issues.apache.org/jira/browse/SPARK-34417 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.1 > Environment: Spark version - 3.0.1 > OS - macOS 10.15.7 > Reporter: Amandeep Sharma > Priority: Major > > Code to reproduce the issue: > {code:java} > import org.apache.spark.sql.SparkSession > object ColumnNameWithDot { > def main(args: Array[String]): Unit = { > val spark = SparkSession.builder.appName("Simple Application") > .config("spark.master", "local").getOrCreate() > spark.sparkContext.setLogLevel("OFF") > import spark.implicits._ > val df = Seq(("abc", 23), ("def", 44), (null, 0)).toDF("ColWith.Dot", > "Col") > df.na.fill(Map("`ColWith.Dot`" -> "na")) > .show() > } > } > {code} > *Analysis* > *------------------------------PART-I-----------------------------------* > Debugged the spark code. It is due to a bug in the spark-catalyst code at > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] > Function in question resolves the column as per the code-comments in the > following order until a match is found. > * Consider pattern dbName.tableName.columnName > * Consider tableName.columnName > * Consider everything as columnName > But implementation considers only the first part for the resolution in the > third step. It should join all parts using dot(.). > *------------------------------PART-II-----------------------------------* > If we don’t use column name with back-tick them it fails at > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L400] > If it is quoted, the condition at > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L413] > becomes false as *k* has value quoted with back-tick whereas *f.name* is > not. Then it fails at > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L422] > It is failing due to the reason mentioned in the PART-I. > *Solution* > Make changes in > [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L266%23L268] > as below: > {color:#ff0000}-val name = nameParts.head{color} > {color:#00875a}+ val name = nameParts.mkString(".") // join all part using > .{color} > val attributes = collectMatches(name, > direct.get(name.toLowerCase(Locale.ROOT))) > {color:#ff0000}- (attributes, nameParts.tail){color} > {color:#00875a}+ (attributes, Seq.empty){color} > *{color:#172b4d}Workaround{color}* > {color:#172b4d}We can make change in > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala#L396]{color} > {color:#172b4d}While we are resolving the input columns, create a new map > with the name of the resolved column and replace value as below.{color} > {color:#172b4d}Idea is to use resolved named instead of input name while > filling null values.{color} > {code:java} > private def fillMap(values: Seq[(String, Any)]): DataFrame = { > // Error handling > var resolved: Map[String, Any] = Map() > values.foreach { case (colName, replaceValue) => > // Check column name exists > val resolvedColumn = df.resolve(colName) > // Check data type > replaceValue match { > case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: > jl.Boolean | _: String => > // This is good > case _ => throw new IllegalArgumentException( > s"Unsupported value type ${replaceValue.getClass.getName} > ($replaceValue).") > } > resolved += (resolvedColumn.name -> replaceValue) > } > val columnEquals = df.sparkSession.sessionState.analyzer.resolver > val projections = df.schema.fields.map { f => > resolved.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, > v) => > v match { > case v: jl.Float => fillCol[Float](f, v) > case v: jl.Double => fillCol[Double](f, v) > case v: jl.Long => fillCol[Long](f, v) > case v: jl.Integer => fillCol[Integer](f, v) > case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue()) > case v: String => fillCol[String](f, v) > } > }.getOrElse(df.col(f.name)) > } > df.select(projections : _*) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org