Repository: spark Updated Branches: refs/heads/master 79f3babcc -> 24e8c27df
[SPARK-25819][SQL] Support parse mode option for the function `from_avro` ## What changes were proposed in this pull request? Current the function `from_avro` throws exception on reading corrupt records. In practice, there could be various reasons of data corruption. It would be good to support `PERMISSIVE` mode and allow the function from_avro to process all the input file/streaming, which is consistent with from_json and from_csv. There is no obvious down side for supporting `PERMISSIVE` mode. Different from `from_csv` and `from_json`, the default parse mode is `FAILFAST` for the following reasons: 1. Since Avro is structured data format, input data is usually able to be parsed by certain schema. In such case, exposing the problems of input data to users is better than hiding it. 2. For `PERMISSIVE` mode, we have to force the data schema as fully nullable. This seems quite unnecessary for Avro. Reversing non-null schema might archive more perf optimizations in Spark. 3. To be consistent with the behavior in Spark 2.4 . ## How was this patch tested? Unit test Manual previewing generated html for the Avro data source doc: ![image](https://user-images.githubusercontent.com/1097932/47510100-02558880-d8aa-11e8-9d57-a43daee4c6b9.png) Closes #22814 from gengliangwang/improve_from_avro. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24e8c27d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24e8c27d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24e8c27d Branch: refs/heads/master Commit: 24e8c27dfe31e6e0a53c89e6ddc36327e537931b Parents: 79f3bab Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Fri Oct 26 11:39:38 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Fri Oct 26 11:39:38 2018 +0800 ---------------------------------------------------------------------- docs/sql-data-sources-avro.md | 18 +++- .../spark/sql/avro/AvroDataToCatalyst.scala | 90 +++++++++++++++++--- .../org/apache/spark/sql/avro/AvroOptions.scala | 16 +++- .../org/apache/spark/sql/avro/package.scala | 28 +++++- .../avro/AvroCatalystDataConversionSuite.scala | 58 +++++++++++-- .../spark/sql/avro/AvroFunctionsSuite.scala | 36 +++++++- 6 files changed, 219 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/docs/sql-data-sources-avro.md ---------------------------------------------------------------------- diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index d3b81f0..bfe641d 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -142,7 +142,10 @@ StreamingQuery query = output ## Data Source Option -Data source options of Avro can be set using the `.option` method on `DataFrameReader` or `DataFrameWriter`. +Data source options of Avro can be set via: + * the `.option` method on `DataFrameReader` or `DataFrameWriter`. + * the `options` parameter in function `from_avro`. + <table class="table"> <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr> <tr> @@ -177,6 +180,19 @@ Data source options of Avro can be set using the `.option` method on `DataFrameR Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and <code>xz</code>.<br> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td> <td>write</td> </tr> + <tr> + <td><code>mode</code></td> + <td>FAILFAST</td> + <td>The <code>mode</code> option allows to specify parse mode for function <code>from_avro</code>.<br> + Currently supported modes are: + <ul> + <li><code>FAILFAST</code>: Throws an exception on processing corrupted record.</li> + <li><code>PERMISSIVE</code>: Corrupt records are processed as null result. Therefore, the + data schema is forced to be fully nullable, which might be different from the one user provided.</li> + </ul> + </td> + <td>function <code>from_avro</code></td> + </tr> </table> ## Configuration http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 915769f..43d3f6e 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -17,20 +17,37 @@ package org.apache.spark.sql.avro +import scala.util.control.NonFatal + import org.apache.avro.Schema import org.apache.avro.generic.GenericDatumReader import org.apache.avro.io.{BinaryDecoder, DecoderFactory} -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} -import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} +import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.types._ -case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) +case class AvroDataToCatalyst( + child: Expression, + jsonFormatSchema: String, + options: Map[String, String]) extends UnaryExpression with ExpectsInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) - override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType + override lazy val dataType: DataType = { + val dt = SchemaConverters.toSqlType(avroSchema).dataType + parseMode match { + // With PermissiveMode, the output Catalyst row might contain columns of null values for + // corrupt records, even if some of the columns are not nullable in the user-provided schema. + // Therefore we force the schema to be all nullable here. + case PermissiveMode => dt.asNullable + case _ => dt + } + } override def nullable: Boolean = true @@ -44,24 +61,75 @@ case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) @transient private var result: Any = _ + @transient private lazy val parseMode: ParseMode = { + val mode = AvroOptions(options).parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) + } + mode + } + + private def unacceptableModeMessage(name: String): String = { + s"from_avro() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for(i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + override def nullSafeEval(input: Any): Any = { val binary = input.asInstanceOf[Array[Byte]] - decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) - result = reader.read(result, decoder) - deserializer.deserialize(result) + try { + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) + } catch { + // There could be multiple possible exceptions here, e.g. java.io.IOException, + // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. + // To make it simple, catch all the exceptions here. + case NonFatal(e) => parseMode match { + case PermissiveMode => nullResultRow + case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + + s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + + "result, try setting the option 'mode' as 'PERMISSIVE'.", e.getCause) + case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } + } } override def simpleString: String = { - s"from_avro(${child.sql}, ${dataType.simpleString})" + s"from_avro(${child.sql}, ${dataType.simpleString}, ${options.toString()})" } override def sql: String = { - s"from_avro(${child.sql}, ${dataType.catalogString})" + s"from_avro(${child.sql}, ${dataType.catalogString}, ${options.toString()})" } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) - defineCodeGen(ctx, ev, input => - s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + nullSafeCodeGen(ctx, ev, eval => { + val result = ctx.freshName("result") + val dt = CodeGenerator.boxedType(dataType) + s""" + $dt $result = ($dt) $expr.nullSafeEval($eval); + if ($result == null) { + ${ev.isNull} = true; + } else { + ${ev.value} = $result; + } + """ + }) } } http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 67f5634..fec17bf 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.avro import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, ParseMode} import org.apache.spark.sql.internal.SQLConf /** @@ -79,4 +80,17 @@ class AvroOptions( val compression: String = { parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec) } + + val parseMode: ParseMode = + parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode) +} + +object AvroOptions { + def apply(parameters: Map[String, String]): AvroOptions = { + val hadoopConf = SparkSession + .getActiveSession + .map(_.sessionState.newHadoopConf()) + .getOrElse(new Configuration()) + new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala index 97f9427..dee8575 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala @@ -17,9 +17,12 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.Experimental package object avro { + /** * Converts a binary column of avro format into its corresponding catalyst value. The specified * schema must match the read data, otherwise the behavior is undefined: it may fail or return @@ -31,8 +34,29 @@ package object avro { * @since 2.4.0 */ @Experimental - def from_avro(data: Column, jsonFormatSchema: String): Column = { - new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema)) + def from_avro( + data: Column, + jsonFormatSchema: String): Column = { + new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty)) + } + + /** + * Converts a binary column of avro format into its corresponding catalyst value. The specified + * schema must match the read data, otherwise the behavior is undefined: it may fail or return + * arbitrary result. + * + * @param data the binary column. + * @param jsonFormatSchema the avro schema in JSON string format. + * @param options options to control how the Avro record is parsed. + * + * @since 3.0.0 + */ + @Experimental + def from_avro( + data: Column, + jsonFormatSchema: String, + options: java.util.Map[String, String]): Column = { + new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, options.asScala.toMap)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 8334cca..80dd4c5 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -17,14 +17,19 @@ package org.apache.spark.sql.avro +import org.apache.avro.Schema + import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.RandomDataGenerator +import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper { +class AvroCatalystDataConversionSuite extends SparkFunSuite + with SharedSQLContext + with ExpressionEvalHelper { private def roundTripTest(data: Literal): Unit = { val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) @@ -33,14 +38,26 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH private def checkResult(data: Literal, schema: String, expected: Any): Unit = { checkEvaluation( - AvroDataToCatalyst(CatalystDataToAvro(data), schema), + AvroDataToCatalyst(CatalystDataToAvro(data), schema, Map.empty), prepareExpectedResult(expected)) } - private def assertFail(data: Literal, schema: String): Unit = { - intercept[java.io.EOFException] { - AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval() + protected def checkUnsupportedRead(data: Literal, schema: String): Unit = { + val binary = CatalystDataToAvro(data) + intercept[Exception] { + AvroDataToCatalyst(binary, schema, Map("mode" -> "FAILFAST")).eval() + } + + val expected = { + val avroSchema = new Schema.Parser().parse(schema) + SchemaConverters.toSqlType(avroSchema).dataType match { + case st: StructType => Row.fromSeq((0 until st.length).map(_ => null)) + case _ => null + } } + + checkEvaluation(AvroDataToCatalyst(binary, schema, Map("mode" -> "PERMISSIVE")), + expected) } private val testingTypes = Seq( @@ -121,7 +138,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH """.stripMargin // When read int as string, avro reader is not able to parse the binary and fail. - assertFail(data, avroTypeJson) + checkUnsupportedRead(data, avroTypeJson) } test("read string as int") { @@ -151,7 +168,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH // When read float data as double, avro reader fails(trying to read 8 bytes while the data have // only 4 bytes). - assertFail(data, avroTypeJson) + checkUnsupportedRead(data, avroTypeJson) } test("read double as float") { @@ -167,4 +184,29 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH // avro reader reads the first 4 bytes of a double as a float, the result is totally undefined. checkResult(data, avroTypeJson, 5.848603E35f) } + + test("Handle unsupported input of record type") { + val actualSchema = StructType(Seq( + StructField("col_0", StringType, false), + StructField("col_1", ShortType, false), + StructField("col_2", DecimalType(8, 4), false), + StructField("col_3", BooleanType, true), + StructField("col_4", DecimalType(38, 38), false))) + + val expectedSchema = StructType(Seq( + StructField("col_0", BinaryType, false), + StructField("col_1", DoubleType, false), + StructField("col_2", DecimalType(18, 4), false), + StructField("col_3", StringType, true), + StructField("col_4", DecimalType(38, 38), false))) + + val seed = scala.util.Random.nextLong() + withClue(s"create random record with seed $seed") { + val data = RandomDataGenerator.randomRow(new scala.util.Random(seed), actualSchema) + val converter = CatalystTypeConverters.createToCatalystConverter(actualSchema) + val input = Literal.create(converter(data), actualSchema) + val avroSchema = SchemaConverters.toAvroType(expectedSchema).toString + checkUnsupportedRead(input, avroSchema) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala index 90a4cd6..46a37d8 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.avro -import org.apache.avro.Schema +import scala.collection.JavaConverters._ -import org.apache.spark.sql.QueryTest +import org.apache.spark.SparkException +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.functions.struct -import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} -class AvroFunctionsSuite extends QueryTest with SharedSQLContext { +class AvroFunctionsSuite extends QueryTest with SharedSQLContext with SQLTestUtils { import testImplicits._ test("roundtrip in to_avro and from_avro - int and string") { @@ -61,6 +62,33 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df) } + test("handle invalid input in from_avro") { + val count = 10 + val df = spark.range(count).select(struct('id, 'id.as("id2")).as("struct")) + val avroStructDF = df.select(to_avro('struct).as("avro")) + val avroTypeStruct = s""" + |{ + | "type": "record", + | "name": "struct", + | "fields": [ + | {"name": "col1", "type": "long"}, + | {"name": "col2", "type": "double"} + | ] + |} + """.stripMargin + + intercept[SparkException] { + avroStructDF.select( + from_avro('avro, avroTypeStruct, Map("mode" -> "FAILFAST").asJava)).collect() + } + + // For PERMISSIVE mode, the result should be row of null columns. + val expected = (0 until count).map(_ => Row(Row(null, null))) + checkAnswer( + avroStructDF.select(from_avro('avro, avroTypeStruct, Map("mode" -> "PERMISSIVE").asJava)), + expected) + } + test("roundtrip in to_avro and from_avro - array with null") { val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array") val avroTypeArrStruct = s""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org