Repository: spark Updated Branches: refs/heads/master 66c4b79af -> 769aa0f1d
[SPARK-19695][SQL] Throw an exception if a `columnNameOfCorruptRecord` field violates requirements in json formats ## What changes were proposed in this pull request? This pr comes from #16928 and fixed a json behaviour along with the CSV one. ## How was this patch tested? Added tests in `JsonSuite`. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #17023 from maropu/SPARK-19695. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/769aa0f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/769aa0f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/769aa0f1 Branch: refs/heads/master Commit: 769aa0f1d22d3c6d4c7871468344d82c8dc36260 Parents: 66c4b79 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Wed Feb 22 21:39:20 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 22 21:39:20 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/json/JacksonParser.scala | 5 +++- .../org/apache/spark/sql/DataFrameReader.scala | 11 ++++++- .../datasources/json/JsonFileFormat.scala | 13 ++++++-- .../execution/datasources/json/JsonSuite.scala | 31 ++++++++++++++++++++ 4 files changed, 56 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 9950959..9b80c0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -58,7 +58,10 @@ class JacksonParser( private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) - corruptFieldIndex.foreach(idx => require(schema(idx).dataType == StringType)) + corruptFieldIndex.foreach { corrFieldIndex => + require(schema(corrFieldIndex).dataType == StringType) + require(schema(corrFieldIndex).nullable) + } @transient private[this] var isWarningPrinted: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4c1341e..2be2276 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.JsonInferSchema -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String /** @@ -365,6 +365,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { createParser) } + // Check a field requirement for corrupt records here to throw an exception in a driver side + schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => + val f = schema(corruptFieldIndex) + if (f.dataType != StringType || !f.nullable) { + throw new AnalysisException( + "The field for corrupt records must be string type and nullable") + } + } + val parsed = jsonDataset.rdd.mapPartitions { iter => val parser = new JacksonParser(schema, parsedOptions) iter.flatMap(parser.parse(_, createParser, UTF8String.fromString)) http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 2cbf4ea..902fee5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -22,13 +22,13 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { @@ -102,6 +102,15 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.columnNameOfCorruptRecord) + // Check a field requirement for corrupt records here to throw an exception in a driver side + dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { corruptFieldIndex => + val f = dataSchema(corruptFieldIndex) + if (f.dataType != StringType || !f.nullable) { + throw new AnalysisException( + "The field for corrupt records must be string type and nullable") + } + } + (file: PartitionedFile) => { val parser = new JacksonParser(requiredSchema, parsedOptions) JsonDataSource(parsedOptions).readFile( http://git-wip-us.apache.org/repos/asf/spark/blob/769aa0f1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 05aa2ab..0e01be2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1944,4 +1944,35 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode")) } } + + test("Throw an exception if a `columnNameOfCorruptRecord` field violates requirements") { + val columnNameOfCorruptRecord = "_unparsed" + val schema = StructType( + StructField(columnNameOfCorruptRecord, IntegerType, true) :: + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + val errMsg = intercept[AnalysisException] { + spark.read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .json(corruptRecords) + }.getMessage + assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + + withTempPath { dir => + val path = dir.getCanonicalPath + corruptRecords.toDF("value").write.text(path) + val errMsg = intercept[AnalysisException] { + spark.read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .schema(schema) + .json(path) + .collect + }.getMessage + assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org