[jira] [Assigned] (SPARK-27798) from_avro can modify variables in other rows in local mode

2019-06-05 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27798:


Assignee: (was: Apache Spark)

> from_avro can modify variables in other rows in local mode
> --
>
> Key: SPARK-27798
> URL: https://issues.apache.org/jira/browse/SPARK-27798
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Yosuke Mori
>Priority: Blocker
>  Labels: correctness
> Attachments: Screen Shot 2019-05-21 at 2.39.27 PM.png
>
>
> Steps to reproduce:
> Create a local Dataset (at least two distinct rows) with a binary Avro field. 
> Use the {{from_avro}} function to deserialize the binary into another column. 
> Verify that all of the rows incorrectly have the same value.
> Here's a concrete example (using Spark 2.4.3). All it does is converts a list 
> of TestPayload objects into binary using the defined avro schema, then tries 
> to deserialize using {{from_avro}} with that same schema:
> {code:java}
> import org.apache.avro.Schema
> import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, 
> GenericRecordBuilder}
> import org.apache.avro.io.EncoderFactory
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.avro.from_avro
> import org.apache.spark.sql.functions.col
> import java.io.ByteArrayOutputStream
> object TestApp extends App {
>   // Payload container
>   case class TestEvent(payload: Array[Byte])
>   // Deserialized Payload
>   case class TestPayload(message: String)
>   // Schema for Payload
>   val simpleSchema =
> """
>   |{
>   |"type": "record",
>   |"name" : "Payload",
>   |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
>   |}
> """.stripMargin
>   // Convert TestPayload into avro binary
>   def generateSimpleSchemaBinary(record: TestPayload, avsc: String): 
> Array[Byte] = {
> val schema = new Schema.Parser().parse(avsc)
> val out = new ByteArrayOutputStream()
> val writer = new GenericDatumWriter[GenericRecord](schema)
> val encoder = EncoderFactory.get().binaryEncoder(out, null)
> val rootRecord = new GenericRecordBuilder(schema).set("message", 
> record.message).build()
> writer.write(rootRecord, encoder)
> encoder.flush()
> out.toByteArray
>   }
>   val spark: SparkSession = 
> SparkSession.builder().master("local[*]").getOrCreate()
>   import spark.implicits._
>   List(
> TestPayload("one"),
> TestPayload("two"),
> TestPayload("three"),
> TestPayload("four")
>   ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, 
> simpleSchema)))
> .toDS()
> .withColumn("deserializedPayload", from_avro(col("payload"), 
> simpleSchema))
> .show(truncate = false)
> }
> {code}
> And here is what this program outputs:
> {noformat}
> +--+---+
> |payload   |deserializedPayload|
> +--+---+
> |[00 06 6F 6E 65]  |[four] |
> |[00 06 74 77 6F]  |[four] |
> |[00 0A 74 68 72 65 65]|[four] |
> |[00 08 66 6F 75 72]   |[four] |
> +--+---+{noformat}
> Here, we can see that the avro binary is correctly generated, but the 
> deserialized version is a copy of the last row. I have not yet verified that 
> this is an issue in cluster mode as well.
>  
> I dug into a bit more of the code and it seems like the resuse of {{result}} 
> in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. 
> I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all 
> point to the same address in memory - and therefore a mutation in one 
> variable will cause all of it to mutate.
> !Screen Shot 2019-05-21 at 2.39.27 PM.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27798) from_avro can modify variables in other rows in local mode

2019-06-05 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27798:


Assignee: Apache Spark

> from_avro can modify variables in other rows in local mode
> --
>
> Key: SPARK-27798
> URL: https://issues.apache.org/jira/browse/SPARK-27798
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3
>Reporter: Yosuke Mori
>Assignee: Apache Spark
>Priority: Blocker
>  Labels: correctness
> Attachments: Screen Shot 2019-05-21 at 2.39.27 PM.png
>
>
> Steps to reproduce:
> Create a local Dataset (at least two distinct rows) with a binary Avro field. 
> Use the {{from_avro}} function to deserialize the binary into another column. 
> Verify that all of the rows incorrectly have the same value.
> Here's a concrete example (using Spark 2.4.3). All it does is converts a list 
> of TestPayload objects into binary using the defined avro schema, then tries 
> to deserialize using {{from_avro}} with that same schema:
> {code:java}
> import org.apache.avro.Schema
> import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, 
> GenericRecordBuilder}
> import org.apache.avro.io.EncoderFactory
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.avro.from_avro
> import org.apache.spark.sql.functions.col
> import java.io.ByteArrayOutputStream
> object TestApp extends App {
>   // Payload container
>   case class TestEvent(payload: Array[Byte])
>   // Deserialized Payload
>   case class TestPayload(message: String)
>   // Schema for Payload
>   val simpleSchema =
> """
>   |{
>   |"type": "record",
>   |"name" : "Payload",
>   |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
>   |}
> """.stripMargin
>   // Convert TestPayload into avro binary
>   def generateSimpleSchemaBinary(record: TestPayload, avsc: String): 
> Array[Byte] = {
> val schema = new Schema.Parser().parse(avsc)
> val out = new ByteArrayOutputStream()
> val writer = new GenericDatumWriter[GenericRecord](schema)
> val encoder = EncoderFactory.get().binaryEncoder(out, null)
> val rootRecord = new GenericRecordBuilder(schema).set("message", 
> record.message).build()
> writer.write(rootRecord, encoder)
> encoder.flush()
> out.toByteArray
>   }
>   val spark: SparkSession = 
> SparkSession.builder().master("local[*]").getOrCreate()
>   import spark.implicits._
>   List(
> TestPayload("one"),
> TestPayload("two"),
> TestPayload("three"),
> TestPayload("four")
>   ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, 
> simpleSchema)))
> .toDS()
> .withColumn("deserializedPayload", from_avro(col("payload"), 
> simpleSchema))
> .show(truncate = false)
> }
> {code}
> And here is what this program outputs:
> {noformat}
> +--+---+
> |payload   |deserializedPayload|
> +--+---+
> |[00 06 6F 6E 65]  |[four] |
> |[00 06 74 77 6F]  |[four] |
> |[00 0A 74 68 72 65 65]|[four] |
> |[00 08 66 6F 75 72]   |[four] |
> +--+---+{noformat}
> Here, we can see that the avro binary is correctly generated, but the 
> deserialized version is a copy of the last row. I have not yet verified that 
> this is an issue in cluster mode as well.
>  
> I dug into a bit more of the code and it seems like the resuse of {{result}} 
> in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. 
> I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all 
> point to the same address in memory - and therefore a mutation in one 
> variable will cause all of it to mutate.
> !Screen Shot 2019-05-21 at 2.39.27 PM.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org