Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22814#discussion_r228062545
  
    --- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
    @@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, 
jsonFormatSchema: String)
     
       @transient private var result: Any = _
     
    +  @transient private lazy val parseMode: ParseMode = {
    +    val mode = AvroOptions(options).parseMode
    +    if (mode != PermissiveMode && mode != FailFastMode) {
    +      throw new AnalysisException(unacceptableModeMessage(mode.name))
    +    }
    +    mode
    +  }
    +
    +  private def unacceptableModeMessage(name: String): String = {
    +    s"from_avro() doesn't support the $name mode. " +
    +      s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}."
    +  }
    +
    +  @transient private lazy val nullResultRow: Any = dataType match {
    +      case st: StructType =>
    +        val resultRow = new SpecificInternalRow(st.map(_.dataType))
    +        for(i <- 0 until st.length) {
    +          resultRow.setNullAt(i)
    +        }
    +        resultRow
    +
    +      case _ =>
    +        null
    +    }
    +
    +
       override def nullSafeEval(input: Any): Any = {
         val binary = input.asInstanceOf[Array[Byte]]
    -    decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, 
decoder)
    -    result = reader.read(result, decoder)
    -    deserializer.deserialize(result)
    +    try {
    +      decoder = DecoderFactory.get().binaryDecoder(binary, 0, 
binary.length, decoder)
    +      result = reader.read(result, decoder)
    +      deserializer.deserialize(result)
    +    } catch {
    +      // There could be multiple possible exceptions here, e.g. 
java.io.IOException,
    +      // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc.
    +      // To make it simple, catch all the exceptions here.
    +      case e: Exception => parseMode match {
    +        case PermissiveMode => nullResultRow
    +        case FailFastMode =>
    +          throw new SparkException("Malformed records are detected in 
record parsing. " +
    +            s"Parse Mode: ${FailFastMode.name}.", e.getCause)
    +        case _ =>
    +          throw new 
AnalysisException(unacceptableModeMessage(parseMode.name))
    +      }
    +    }
       }
     
       override def simpleString: String = {
    -    s"from_avro(${child.sql}, ${dataType.simpleString})"
    +    s"from_avro(${child.sql}, ${dataType.simpleString}, 
${options.toString()})"
       }
     
       override def sql: String = {
    -    s"from_avro(${child.sql}, ${dataType.catalogString})"
    +    s"from_avro(${child.sql}, ${dataType.catalogString}, 
${options.toString()})"
       }
     
       override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
         val expr = ctx.addReferenceObj("this", this)
    -    defineCodeGen(ctx, ev, input =>
    -      s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
    +    nullSafeCodeGen(ctx, ev, eval => {
    +      val result = ctx.freshName("tempResult")
    +      s"""
    +        ${CodeGenerator.boxedType(dataType)} $result =
    --- End diff --
    
    nit: maybe
    ```
    val dt = CodeGenerator.boxedType(dataType)
    ....
    dt $result = ($dt) ...
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to