[ https://issues.apache.org/jira/browse/SPARK-27798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16851966#comment-16851966 ]
Gengliang Wang edited comment on SPARK-27798 at 5/30/19 3:07 PM: ----------------------------------------------------------------- Turning off the rule "ConvertToLocalRelation" should work around the problem: spark.conf.set(“spark.sql.optimizer.excludedRules”, “org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation”) was (Author: gengliang.wang): Turning off the rule "ConvertToLocalRelation" should fix the problem: spark.conf.set(“spark.sql.optimizer.excludedRules”, “org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation”) > from_avro can modify variables in other rows in local mode > ---------------------------------------------------------- > > Key: SPARK-27798 > URL: https://issues.apache.org/jira/browse/SPARK-27798 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.3 > Reporter: Yosuke Mori > Priority: Blocker > Labels: correctness > Attachments: Screen Shot 2019-05-21 at 2.39.27 PM.png > > > Steps to reproduce: > Create a local Dataset (at least two distinct rows) with a binary Avro field. > Use the {{from_avro}} function to deserialize the binary into another column. > Verify that all of the rows incorrectly have the same value. > Here's a concrete example (using Spark 2.4.3). All it does is converts a list > of TestPayload objects into binary using the defined avro schema, then tries > to deserialize using {{from_avro}} with that same schema: > {code:java} > import org.apache.avro.Schema > import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, > GenericRecordBuilder} > import org.apache.avro.io.EncoderFactory > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.avro.from_avro > import org.apache.spark.sql.functions.col > import java.io.ByteArrayOutputStream > object TestApp extends App { > // Payload container > case class TestEvent(payload: Array[Byte]) > // Deserialized Payload > case class TestPayload(message: String) > // Schema for Payload > val simpleSchema = > """ > |{ > |"type": "record", > |"name" : "Payload", > |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] > |} > """.stripMargin > // Convert TestPayload into avro binary > def generateSimpleSchemaBinary(record: TestPayload, avsc: String): > Array[Byte] = { > val schema = new Schema.Parser().parse(avsc) > val out = new ByteArrayOutputStream() > val writer = new GenericDatumWriter[GenericRecord](schema) > val encoder = EncoderFactory.get().binaryEncoder(out, null) > val rootRecord = new GenericRecordBuilder(schema).set("message", > record.message).build() > writer.write(rootRecord, encoder) > encoder.flush() > out.toByteArray > } > val spark: SparkSession = > SparkSession.builder().master("local[*]").getOrCreate() > import spark.implicits._ > List( > TestPayload("one"), > TestPayload("two"), > TestPayload("three"), > TestPayload("four") > ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, > simpleSchema))) > .toDS() > .withColumn("deserializedPayload", from_avro(col("payload"), > simpleSchema)) > .show(truncate = false) > } > {code} > And here is what this program outputs: > {noformat} > +----------------------+-------------------+ > |payload |deserializedPayload| > +----------------------+-------------------+ > |[00 06 6F 6E 65] |[four] | > |[00 06 74 77 6F] |[four] | > |[00 0A 74 68 72 65 65]|[four] | > |[00 08 66 6F 75 72] |[four] | > +----------------------+-------------------+{noformat} > Here, we can see that the avro binary is correctly generated, but the > deserialized version is a copy of the last row. I have not yet verified that > this is an issue in cluster mode as well. > > I dug into a bit more of the code and it seems like the resuse of {{result}} > in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. > I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all > point to the same address in memory - and therefore a mutation in one > variable will cause all of it to mutate. > !Screen Shot 2019-05-21 at 2.39.27 PM.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org