[ https://issues.apache.org/jira/browse/SPARK-27798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yosuke Mori updated SPARK-27798: -------------------------------- Description: Steps to reproduce: Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the {{from_avro}} function to deserialize the binary into another column. Verify that the rows incorrectly have the same value. Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to re-serialize using {{from_avro}} with that same schema: {code:language=scala} import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.from_avro import org.apache.spark.sql.functions.col import java.io.ByteArrayOutputStream object TestApp extends App { // Payload container case class TestEvent(payload: Array[Byte]) // Deserialized Payload case class TestPayload(message: String) // Schema for Payload val simpleSchema = """ |{ |"type": "record", |"name" : "Payload", |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] |} """.stripMargin // Convert TestPayload into avro binary def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = { val schema = new Schema.Parser().parse(avsc) val out = new ByteArrayOutputStream() val writer = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get().binaryEncoder(out, null) val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build() writer.write(rootRecord, encoder) encoder.flush() out.toByteArray } val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ List( TestPayload("one"), TestPayload("two"), TestPayload("three"), TestPayload("four") ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema))) .toDS() .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema)) .show(truncate = false) } {code} And here is what this program outputs: {noformat} +----------------------+-------------------+ |payload |deserializedPayload| +----------------------+-------------------+ |[00 06 6F 6E 65] |[four] | |[00 06 74 77 6F] |[four] | |[00 0A 74 68 72 65 65]|[four] | |[00 08 66 6F 75 72] |[four] | +----------------------+-------------------+{noformat} Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I dug into a bit more of the code and it seems like the resuse of {{result}} in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate. !Screen Shot 2019-05-21 at 2.39.27 PM.png! was: Steps to reproduce: Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the {{from_avro}} function to deserialize the binary into another column. Verify that the rows incorrectly have the same value. Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to re-serialize using {{from_avro}} with that same schema: {noformat} import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.from_avro import org.apache.spark.sql.functions.col import java.io.ByteArrayOutputStream object TestApp extends App { // Payload container case class TestEvent(payload: Array[Byte]) // Deserialized Payload case class TestPayload(message: String) // Schema for Payload val simpleSchema = """ |{ |"type": "record", |"name" : "Payload", |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] |} """.stripMargin // Convert TestPayload into avro binary def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = { val schema = new Schema.Parser().parse(avsc) val out = new ByteArrayOutputStream() val writer = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get().binaryEncoder(out, null) val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build() writer.write(rootRecord, encoder) encoder.flush() out.toByteArray } val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ List( TestPayload("one"), TestPayload("two"), TestPayload("three"), TestPayload("four") ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema))) .toDS() .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema)) .show(truncate = false) } {noformat} And here is what this program outputs: {noformat} +----------------------+-------------------+ |payload |deserializedPayload| +----------------------+-------------------+ |[00 06 6F 6E 65] |[four] | |[00 06 74 77 6F] |[four] | |[00 0A 74 68 72 65 65]|[four] | |[00 08 66 6F 75 72] |[four] | +----------------------+-------------------+{noformat} Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I dug into a bit more of the code and it seems like the resuse of {{result}} in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate. !Screen Shot 2019-05-21 at 2.39.27 PM.png! > from_avro can modify variables in other rows > -------------------------------------------- > > Key: SPARK-27798 > URL: https://issues.apache.org/jira/browse/SPARK-27798 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.3 > Reporter: Yosuke Mori > Priority: Major > Attachments: Screen Shot 2019-05-21 at 2.39.27 PM.png > > > Steps to reproduce: > Create a local Dataset (at least two distinct rows) with a binary Avro field. > Use the {{from_avro}} function to deserialize the binary into another column. > Verify that the rows incorrectly have the same value. > Here's a concrete example (using Spark 2.4.3). All it does is converts a list > of TestPayload objects into binary using the defined avro schema, then tries > to re-serialize using {{from_avro}} with that same schema: > {code:language=scala} > import org.apache.avro.Schema > import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, > GenericRecordBuilder} > import org.apache.avro.io.EncoderFactory > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.avro.from_avro > import org.apache.spark.sql.functions.col > import java.io.ByteArrayOutputStream > object TestApp extends App { > // Payload container > case class TestEvent(payload: Array[Byte]) > // Deserialized Payload > case class TestPayload(message: String) > // Schema for Payload > val simpleSchema = > """ > |{ > |"type": "record", > |"name" : "Payload", > |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] > |} > """.stripMargin > // Convert TestPayload into avro binary > def generateSimpleSchemaBinary(record: TestPayload, avsc: String): > Array[Byte] = { > val schema = new Schema.Parser().parse(avsc) > val out = new ByteArrayOutputStream() > val writer = new GenericDatumWriter[GenericRecord](schema) > val encoder = EncoderFactory.get().binaryEncoder(out, null) > val rootRecord = new GenericRecordBuilder(schema).set("message", > record.message).build() > writer.write(rootRecord, encoder) > encoder.flush() > out.toByteArray > } > val spark: SparkSession = > SparkSession.builder().master("local[*]").getOrCreate() > import spark.implicits._ > List( > TestPayload("one"), > TestPayload("two"), > TestPayload("three"), > TestPayload("four") > ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, > simpleSchema))) > .toDS() > .withColumn("deserializedPayload", from_avro(col("payload"), > simpleSchema)) > .show(truncate = false) > } > {code} > And here is what this program outputs: > {noformat} > +----------------------+-------------------+ > |payload |deserializedPayload| > +----------------------+-------------------+ > |[00 06 6F 6E 65] |[four] | > |[00 06 74 77 6F] |[four] | > |[00 0A 74 68 72 65 65]|[four] | > |[00 08 66 6F 75 72] |[four] | > +----------------------+-------------------+{noformat} > Here, we can see that the avro binary is correctly generated, but the > deserialized version is a copy of the last row. > > I dug into a bit more of the code and it seems like the resuse of {{result}} > in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. > I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all > point to the same address in memory - and therefore a mutation in one > variable will cause all of it to mutate. > !Screen Shot 2019-05-21 at 2.39.27 PM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org