Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22814#discussion_r228062545 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -44,24 +59,74 @@ case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String) @transient private var result: Any = _ + @transient private lazy val parseMode: ParseMode = { + val mode = AvroOptions(options).parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) + } + mode + } + + private def unacceptableModeMessage(name: String): String = { + s"from_avro() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for(i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + override def nullSafeEval(input: Any): Any = { val binary = input.asInstanceOf[Array[Byte]] - decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) - result = reader.read(result, decoder) - deserializer.deserialize(result) + try { + decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder) + result = reader.read(result, decoder) + deserializer.deserialize(result) + } catch { + // There could be multiple possible exceptions here, e.g. java.io.IOException, + // AvroRuntimeException, ArrayIndexOutOfBoundsException, etc. + // To make it simple, catch all the exceptions here. + case e: Exception => parseMode match { + case PermissiveMode => nullResultRow + case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + + s"Parse Mode: ${FailFastMode.name}.", e.getCause) + case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } + } } override def simpleString: String = { - s"from_avro(${child.sql}, ${dataType.simpleString})" + s"from_avro(${child.sql}, ${dataType.simpleString}, ${options.toString()})" } override def sql: String = { - s"from_avro(${child.sql}, ${dataType.catalogString})" + s"from_avro(${child.sql}, ${dataType.catalogString}, ${options.toString()})" } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val expr = ctx.addReferenceObj("this", this) - defineCodeGen(ctx, ev, input => - s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)") + nullSafeCodeGen(ctx, ev, eval => { + val result = ctx.freshName("tempResult") + s""" + ${CodeGenerator.boxedType(dataType)} $result = --- End diff -- nit: maybe ``` val dt = CodeGenerator.boxedType(dataType) .... dt $result = ($dt) ... ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org