Repository: spark Updated Branches: refs/heads/master da25c86d6 -> 7bca62f79
[SPARK-6607][SQL] Check invalid characters for Parquet schema and show error messages '(' and ')' are special characters used in Parquet schema for type annotation. When we run an aggregation query, we will obtain attribute name such as "MAX(a)". If we directly store the generated DataFrame as Parquet file, it causes failure when reading and parsing the stored schema string. Several methods can be adopted to solve this. This pr uses a simplest one to just replace attribute names before generating Parquet schema based on these attributes. Another possible method might be modifying all aggregation expression names from "func(column)" to "func[column]". Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #5263 from viirya/parquet_aggregation_name and squashes the following commits: 2d70542 [Liang-Chi Hsieh] Address comment. 463dff4 [Liang-Chi Hsieh] Instead of replacing special chars, showing error message to user to suggest using Alias. 1de001d [Liang-Chi Hsieh] Replace special characters '(' and ')' of Parquet schema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bca62f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bca62f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bca62f7 Branch: refs/heads/master Commit: 7bca62f79056e592cf07b49d8b8d04c59dea25fc Parents: da25c86 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Sun Apr 5 00:20:43 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Sun Apr 5 00:20:43 2015 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/parquet/ParquetTypes.scala | 14 ++++++++++++++ .../org/apache/spark/sql/hive/parquetSuites.scala | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7bca62f7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index da668f0..60e1bec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging { def convertFromAttributes(attributes: Seq[Attribute], toThriftSchemaNames: Boolean = false): MessageType = { + checkSpecialCharacters(attributes) val fields = attributes.map( attribute => fromDataType(attribute.dataType, attribute.name, attribute.nullable, @@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging { } } + private def checkSpecialCharacters(schema: Seq[Attribute]) = { + // ,;{}()\n\t= and space character are special characters in Parquet schema + schema.map(_.name).foreach { name => + if (name.matches(".*[ ,;{}()\n\t=].*")) { + sys.error( + s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=". + |Please use alias to rename it. + """.stripMargin.split("\n").mkString(" ")) + } + } + } + def convertToString(schema: Seq[Attribute]): String = { + checkSpecialCharacters(schema) StructType.fromAttributes(schema).json } http://git-wip-us.apache.org/repos/asf/spark/blob/7bca62f7/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 1319c81..5f71e1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -688,6 +688,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { sql("DROP TABLE alwaysNullable") } + + test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { + val tempDir = Utils.createTempDir() + val filePath = new File(tempDir, "testParquet").getCanonicalPath + val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath + + val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") + val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") + intercept[RuntimeException](df2.saveAsParquetFile(filePath)) + + val df3 = df2.toDF("str", "max_int") + df3.saveAsParquetFile(filePath2) + val df4 = parquetFile(filePath2) + checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) + assert(df4.columns === Array("str", "max_int")) + } } class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org