Repository: spark
Updated Branches:
  refs/heads/master 79f3babcc -> 24e8c27df


[SPARK-25819][SQL] Support parse mode option for the function `from_avro`

## What changes were proposed in this pull request?

Current the function `from_avro` throws exception on reading corrupt records.
In practice, there could be various reasons of data corruption. It would be 
good to support `PERMISSIVE` mode and allow the function from_avro to process 
all the input file/streaming, which is consistent with from_json and from_csv. 
There is no obvious down side for supporting `PERMISSIVE` mode.

Different from `from_csv` and `from_json`, the default parse mode is `FAILFAST` 
for the following reasons:
1. Since Avro is structured data format, input data is usually able to be 
parsed by certain schema.  In such case, exposing the problems of input data to 
users is better than hiding it.
2. For `PERMISSIVE` mode, we have to force the data schema as fully nullable. 
This seems quite unnecessary for Avro. Reversing non-null schema might archive 
more perf optimizations in Spark.
3. To be consistent with the behavior in Spark 2.4 .

## How was this patch tested?

Unit test

Manual previewing generated html for the Avro data source doc:

![image](https://user-images.githubusercontent.com/1097932/47510100-02558880-d8aa-11e8-9d57-a43daee4c6b9.png)

Closes #22814 from gengliangwang/improve_from_avro.

Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24e8c27d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24e8c27d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24e8c27d

Branch: refs/heads/master
Commit: 24e8c27dfe31e6e0a53c89e6ddc36327e537931b
Parents: 79f3bab
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Fri Oct 26 11:39:38 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Fri Oct 26 11:39:38 2018 +0800

----------------------------------------------------------------------
 docs/sql-data-sources-avro.md                   | 18 +++-
 .../spark/sql/avro/AvroDataToCatalyst.scala     | 90 +++++++++++++++++---
 .../org/apache/spark/sql/avro/AvroOptions.scala | 16 +++-
 .../org/apache/spark/sql/avro/package.scala     | 28 +++++-
 .../avro/AvroCatalystDataConversionSuite.scala  | 58 +++++++++++--
 .../spark/sql/avro/AvroFunctionsSuite.scala     | 36 +++++++-
 6 files changed, 219 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/docs/sql-data-sources-avro.md
----------------------------------------------------------------------
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index d3b81f0..bfe641d 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -142,7 +142,10 @@ StreamingQuery query = output
 
 ## Data Source Option
 
-Data source options of Avro can be set using the `.option` method on 
`DataFrameReader` or `DataFrameWriter`.
+Data source options of Avro can be set via:
+ * the `.option` method on `DataFrameReader` or `DataFrameWriter`.
+ * the `options` parameter in function `from_avro`.
+
 <table class="table">
   <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr>
   <tr>
@@ -177,6 +180,19 @@ Data source options of Avro can be set using the `.option` 
method on `DataFrameR
   Currently supported codecs are <code>uncompressed</code>, 
<code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and 
<code>xz</code>.<br> If the option is not set, the configuration 
<code>spark.sql.avro.compression.codec</code> config is taken into account.</td>
     <td>write</td>
   </tr>
+  <tr>
+    <td><code>mode</code></td>
+    <td>FAILFAST</td>
+    <td>The <code>mode</code> option allows to specify parse mode for function 
<code>from_avro</code>.<br>
+      Currently supported modes are:
+      <ul>
+        <li><code>FAILFAST</code>: Throws an exception on processing corrupted 
record.</li>
+        <li><code>PERMISSIVE</code>: Corrupt records are processed as null 
result. Therefore, the
+        data schema is forced to be fully nullable, which might be different 
from the one user provided.</li>
+      </ul>
+    </td>
+    <td>function <code>from_avro</code></td>
+  </tr>
 </table>
 
 ## Configuration

http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index 915769f..43d3f6e 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -17,20 +17,37 @@
 
 package org.apache.spark.sql.avro
 
+import scala.util.control.NonFatal
+
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericDatumReader
 import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
 
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, SpecificInternalRow, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
-import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.types._
 
-case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
+case class AvroDataToCatalyst(
+    child: Expression,
+    jsonFormatSchema: String,
+    options: Map[String, String])
   extends UnaryExpression with ExpectsInputTypes {
 
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
-  override lazy val dataType: DataType = 
SchemaConverters.toSqlType(avroSchema).dataType
+  override lazy val dataType: DataType = {
+    val dt = SchemaConverters.toSqlType(avroSchema).dataType
+    parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+  }
 
   override def nullable: Boolean = true
 
@@ -44,24 +61,75 @@ case class AvroDataToCatalyst(child: Expression, 
jsonFormatSchema: String)
 
   @transient private var result: Any = _
 
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = AvroOptions(options).parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new AnalysisException(unacceptableModeMessage(mode.name))
+    }
+    mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+    s"from_avro() doesn't support the $name mode. " +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+      case st: StructType =>
+        val resultRow = new SpecificInternalRow(st.map(_.dataType))
+        for(i <- 0 until st.length) {
+          resultRow.setNullAt(i)
+        }
+        resultRow
+
+      case _ =>
+        null
+    }
+
+
   override def nullSafeEval(input: Any): Any = {
     val binary = input.asInstanceOf[Array[Byte]]
-    decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, 
decoder)
-    result = reader.read(result, decoder)
-    deserializer.deserialize(result)
+    try {
+      decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, 
decoder)
+      result = reader.read(result, decoder)
+      deserializer.deserialize(result)
+    } catch {
+      // There could be multiple possible exceptions here, e.g. 
java.io.IOException,
+      // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc.
+      // To make it simple, catch all the exceptions here.
+      case NonFatal(e) => parseMode match {
+        case PermissiveMode => nullResultRow
+        case FailFastMode =>
+          throw new SparkException("Malformed records are detected in record 
parsing. " +
+            s"Current parse Mode: ${FailFastMode.name}. To process malformed 
records as null " +
+            "result, try setting the option 'mode' as 'PERMISSIVE'.", 
e.getCause)
+        case _ =>
+          throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+      }
+    }
   }
 
   override def simpleString: String = {
-    s"from_avro(${child.sql}, ${dataType.simpleString})"
+    s"from_avro(${child.sql}, ${dataType.simpleString}, ${options.toString()})"
   }
 
   override def sql: String = {
-    s"from_avro(${child.sql}, ${dataType.catalogString})"
+    s"from_avro(${child.sql}, ${dataType.catalogString}, 
${options.toString()})"
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
     val expr = ctx.addReferenceObj("this", this)
-    defineCodeGen(ctx, ev, input =>
-      s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
+    nullSafeCodeGen(ctx, ev, eval => {
+      val result = ctx.freshName("result")
+      val dt = CodeGenerator.boxedType(dataType)
+      s"""
+        $dt $result = ($dt) $expr.nullSafeEval($eval);
+        if ($result == null) {
+          ${ev.isNull} = true;
+        } else {
+          ${ev.value} = $result;
+        }
+      """
+    })
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 67f5634..fec17bf 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.avro
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, FailFastMode, 
ParseMode}
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -79,4 +80,17 @@ class AvroOptions(
   val compression: String = {
     parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
   }
+
+  val parseMode: ParseMode =
+    parameters.get("mode").map(ParseMode.fromString).getOrElse(FailFastMode)
+}
+
+object AvroOptions {
+  def apply(parameters: Map[String, String]): AvroOptions = {
+    val hadoopConf = SparkSession
+      .getActiveSession
+      .map(_.sessionState.newHadoopConf())
+      .getOrElse(new Configuration())
+    new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
index 97f9427..dee8575 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.annotation.Experimental
 
 package object avro {
+
   /**
    * Converts a binary column of avro format into its corresponding catalyst 
value. The specified
    * schema must match the read data, otherwise the behavior is undefined: it 
may fail or return
@@ -31,8 +34,29 @@ package object avro {
    * @since 2.4.0
    */
   @Experimental
-  def from_avro(data: Column, jsonFormatSchema: String): Column = {
-    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
+  def from_avro(
+      data: Column,
+      jsonFormatSchema: String): Column = {
+    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, Map.empty))
+  }
+
+  /**
+   * Converts a binary column of avro format into its corresponding catalyst 
value. The specified
+   * schema must match the read data, otherwise the behavior is undefined: it 
may fail or return
+   * arbitrary result.
+   *
+   * @param data the binary column.
+   * @param jsonFormatSchema the avro schema in JSON string format.
+   * @param options options to control how the Avro record is parsed.
+   *
+   * @since 3.0.0
+   */
+  @Experimental
+  def from_avro(
+      data: Column,
+      jsonFormatSchema: String,
+      options: java.util.Map[String, String]): Column = {
+    new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema, 
options.asScala.toMap))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index 8334cca..80dd4c5 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -17,14 +17,19 @@
 
 package org.apache.spark.sql.avro
 
+import org.apache.avro.Schema
+
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.RandomDataGenerator
+import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
-class AvroCatalystDataConversionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+class AvroCatalystDataConversionSuite extends SparkFunSuite
+  with SharedSQLContext
+  with ExpressionEvalHelper {
 
   private def roundTripTest(data: Literal): Unit = {
     val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable)
@@ -33,14 +38,26 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite 
with ExpressionEvalH
 
   private def checkResult(data: Literal, schema: String, expected: Any): Unit 
= {
     checkEvaluation(
-      AvroDataToCatalyst(CatalystDataToAvro(data), schema),
+      AvroDataToCatalyst(CatalystDataToAvro(data), schema, Map.empty),
       prepareExpectedResult(expected))
   }
 
-  private def assertFail(data: Literal, schema: String): Unit = {
-    intercept[java.io.EOFException] {
-      AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval()
+  protected def checkUnsupportedRead(data: Literal, schema: String): Unit = {
+    val binary = CatalystDataToAvro(data)
+    intercept[Exception] {
+      AvroDataToCatalyst(binary, schema, Map("mode" -> "FAILFAST")).eval()
+    }
+
+    val expected = {
+      val avroSchema = new Schema.Parser().parse(schema)
+      SchemaConverters.toSqlType(avroSchema).dataType match {
+        case st: StructType => Row.fromSeq((0 until st.length).map(_ => null))
+        case _ => null
+      }
     }
+
+    checkEvaluation(AvroDataToCatalyst(binary, schema, Map("mode" -> 
"PERMISSIVE")),
+      expected)
   }
 
   private val testingTypes = Seq(
@@ -121,7 +138,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite 
with ExpressionEvalH
        """.stripMargin
 
     // When read int as string, avro reader is not able to parse the binary 
and fail.
-    assertFail(data, avroTypeJson)
+    checkUnsupportedRead(data, avroTypeJson)
   }
 
   test("read string as int") {
@@ -151,7 +168,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite 
with ExpressionEvalH
 
     // When read float data as double, avro reader fails(trying to read 8 
bytes while the data have
     // only 4 bytes).
-    assertFail(data, avroTypeJson)
+    checkUnsupportedRead(data, avroTypeJson)
   }
 
   test("read double as float") {
@@ -167,4 +184,29 @@ class AvroCatalystDataConversionSuite extends 
SparkFunSuite with ExpressionEvalH
     // avro reader reads the first 4 bytes of a double as a float, the result 
is totally undefined.
     checkResult(data, avroTypeJson, 5.848603E35f)
   }
+
+  test("Handle unsupported input of record type") {
+    val actualSchema = StructType(Seq(
+      StructField("col_0", StringType, false),
+      StructField("col_1", ShortType, false),
+      StructField("col_2", DecimalType(8, 4), false),
+      StructField("col_3", BooleanType, true),
+      StructField("col_4", DecimalType(38, 38), false)))
+
+    val expectedSchema = StructType(Seq(
+      StructField("col_0", BinaryType, false),
+      StructField("col_1", DoubleType, false),
+      StructField("col_2", DecimalType(18, 4), false),
+      StructField("col_3", StringType, true),
+      StructField("col_4", DecimalType(38, 38), false)))
+
+    val seed = scala.util.Random.nextLong()
+    withClue(s"create random record with seed $seed") {
+      val data = RandomDataGenerator.randomRow(new scala.util.Random(seed), 
actualSchema)
+      val converter = 
CatalystTypeConverters.createToCatalystConverter(actualSchema)
+      val input = Literal.create(converter(data), actualSchema)
+      val avroSchema = SchemaConverters.toAvroType(expectedSchema).toString
+      checkUnsupportedRead(input, avroSchema)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/24e8c27d/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
index 90a4cd6..46a37d8 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.sql.avro
 
-import org.apache.avro.Schema
+import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.functions.struct
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 
-class AvroFunctionsSuite extends QueryTest with SharedSQLContext {
+class AvroFunctionsSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
   import testImplicits._
 
   test("roundtrip in to_avro and from_avro - int and string") {
@@ -61,6 +62,33 @@ class AvroFunctionsSuite extends QueryTest with 
SharedSQLContext {
     checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
   }
 
+  test("handle invalid input in from_avro") {
+    val count = 10
+    val df = spark.range(count).select(struct('id, 'id.as("id2")).as("struct"))
+    val avroStructDF = df.select(to_avro('struct).as("avro"))
+    val avroTypeStruct = s"""
+      |{
+      |  "type": "record",
+      |  "name": "struct",
+      |  "fields": [
+      |    {"name": "col1", "type": "long"},
+      |    {"name": "col2", "type": "double"}
+      |  ]
+      |}
+    """.stripMargin
+
+    intercept[SparkException] {
+      avroStructDF.select(
+        from_avro('avro, avroTypeStruct, Map("mode" -> 
"FAILFAST").asJava)).collect()
+    }
+
+    // For PERMISSIVE mode, the result should be row of null columns.
+    val expected = (0 until count).map(_ => Row(Row(null, null)))
+    checkAnswer(
+      avroStructDF.select(from_avro('avro, avroTypeStruct, Map("mode" -> 
"PERMISSIVE").asJava)),
+      expected)
+  }
+
   test("roundtrip in to_avro and from_avro - array with null") {
     val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: 
Nil)).toDF("array")
     val avroTypeArrStruct = s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to