Yosuke Mori created SPARK-27798: ----------------------------------- Summary: from_avro can modify variables in other rows Key: SPARK-27798 URL: https://issues.apache.org/jira/browse/SPARK-27798 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.3 Reporter: Yosuke Mori
Steps to reproduce: Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the {{from_avro}} function to deserialize the binary into another column. Verify that the rows incorrectly have the same value. Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to re-serialize using {{from_avro}} with that same schema: {noformat} import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.from_avro import org.apache.spark.sql.functions.col import java.io.ByteArrayOutputStream object TestApp extends App { // Payload container case class TestEvent(payload: Array[Byte]) // Deserialized Payload case class TestPayload(message: String) // Schema for Payload val simpleSchema = """ |{ |"type": "record", |"name" : "Payload", |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] |} """.stripMargin // Convert TestPayload into avro binary def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = { val schema = new Schema.Parser().parse(avsc) val out = new ByteArrayOutputStream() val writer = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get().binaryEncoder(out, null) val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build() writer.write(rootRecord, encoder) encoder.flush() out.toByteArray } val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ List( TestPayload("one"), TestPayload("two"), TestPayload("three"), TestPayload("four") ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema))) .toDS() .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema)) .show(truncate = false) } {noformat} And here is what this program outputs: {noformat} +----------------------+-------------------+ |payload |deserializedPayload| +----------------------+-------------------+ |[00 06 6F 6E 65] |[four] | |[00 06 74 77 6F] |[four] | |[00 0A 74 68 72 65 65]|[four] | |[00 08 66 6F 75 72] |[four] | +----------------------+-------------------+{noformat} Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I dug into a bit more of the code and it seems like the resuse of {{result}} in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org