[ https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Saniya Tech updated SPARK-22460: -------------------------------- Description: We are trying to serialize Timestamp fields to Avro using spark-avro connector. I can see the Timestamp fields are getting correctly serialized as long (milliseconds since Epoch). I verified that the data is correctly read back from the Avro files. It is when we encode the Dataset as a case class that timestamp field is incorrectly converted to a long value as seconds since Epoch. As can be seen below, this shifts the timestamp many years in the future. Code used to reproduce the issue: {code:java} import java.sql.Timestamp import com.databricks.spark.avro._ import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} case class TestRecord(name: String, modified: Timestamp) import spark.implicits._ val data = Seq( TestRecord("One", new Timestamp(System.currentTimeMillis())) ) // Serialize: val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> "com.example.domain") val path = s"s3a://some-bucket/output/" val ds = spark.createDataset(data) ds.write .options(parameters) .mode(SaveMode.Overwrite) .avro(path) // // De-serialize val output = spark.read.avro(path).as[TestRecord] {code} Output from the test: {code:java} scala> data.head res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419) scala> output.collect().head res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0) {code} was: We are trying to serialize Timestamp fields to Avro using spark-avro connector. I can see the Timestamp fields are getting correctly serialized as long (milliseconds since Epoch). I verified that the data is correctly read back from the Avro files. It is when we encode the Dataset as a case class that timestamp field is incorrectly converted to as long value as seconds since Epoch. As can be seen below, this shifts the timestamp many years in the future. Code used to reproduce the issue: {code:java} import java.sql.Timestamp import com.databricks.spark.avro._ import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} case class TestRecord(name: String, modified: Timestamp) import spark.implicits._ val data = Seq( TestRecord("One", new Timestamp(System.currentTimeMillis())) ) // Serialize: val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> "com.example.domain") val path = s"s3a://some-bucket/output/" val ds = spark.createDataset(data) ds.write .options(parameters) .mode(SaveMode.Overwrite) .avro(path) // // De-serialize val output = spark.read.avro(path).as[TestRecord] {code} Output from the test: {code:java} scala> data.head res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419) scala> output.collect().head res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0) {code} > Spark De-serialization of Timestamp field is Incorrect > ------------------------------------------------------ > > Key: SPARK-22460 > URL: https://issues.apache.org/jira/browse/SPARK-22460 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 2.1.1 > Reporter: Saniya Tech > > We are trying to serialize Timestamp fields to Avro using spark-avro > connector. I can see the Timestamp fields are getting correctly serialized as > long (milliseconds since Epoch). I verified that the data is correctly read > back from the Avro files. It is when we encode the Dataset as a case class > that timestamp field is incorrectly converted to a long value as seconds > since Epoch. As can be seen below, this shifts the timestamp many years in > the future. > Code used to reproduce the issue: > {code:java} > import java.sql.Timestamp > import com.databricks.spark.avro._ > import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} > case class TestRecord(name: String, modified: Timestamp) > import spark.implicits._ > val data = Seq( > TestRecord("One", new Timestamp(System.currentTimeMillis())) > ) > // Serialize: > val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> > "com.example.domain") > val path = s"s3a://some-bucket/output/" > val ds = spark.createDataset(data) > ds.write > .options(parameters) > .mode(SaveMode.Overwrite) > .avro(path) > // > // De-serialize > val output = spark.read.avro(path).as[TestRecord] > {code} > Output from the test: > {code:java} > scala> data.head > res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419) > scala> output.collect().head > res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org