[ 
https://issues.apache.org/jira/browse/SPARK-22460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saniya Tech updated SPARK-22460:
--------------------------------
    Description: 
We are trying to serialize Timestamp fields to Avro using spark-avro connector. 
I can see the Timestamp fields are getting correctly serialized as long 
(milliseconds since Epoch). I verified that the data is correctly read back 
from the Avro files. It is when we encode the Dataset as a case class that 
timestamp field is incorrectly converted to a long value as seconds since 
Epoch. As can be seen below, this shifts the timestamp many years in the future.

Code used to reproduce the issue:

{code:java}
import java.sql.Timestamp

import com.databricks.spark.avro._
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

case class TestRecord(name: String, modified: Timestamp)

import spark.implicits._
val data = Seq(
  TestRecord("One", new Timestamp(System.currentTimeMillis()))
)

// Serialize:
val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
"com.example.domain")
val path = s"s3a://some-bucket/output/"
val ds = spark.createDataset(data)
ds.write
  .options(parameters)
  .mode(SaveMode.Overwrite)
  .avro(path)
//

// De-serialize
val output = spark.read.avro(path).as[TestRecord]
{code}

Output from the test:
{code:java}
scala> data.head
res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)

scala> output.collect().head
res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
{code}



  was:
We are trying to serialize Timestamp fields to Avro using spark-avro connector. 
I can see the Timestamp fields are getting correctly serialized as long 
(milliseconds since Epoch). I verified that the data is correctly read back 
from the Avro files. It is when we encode the Dataset as a case class that 
timestamp field is incorrectly converted to as long value as seconds since 
Epoch. As can be seen below, this shifts the timestamp many years in the future.

Code used to reproduce the issue:

{code:java}
import java.sql.Timestamp

import com.databricks.spark.avro._
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}

case class TestRecord(name: String, modified: Timestamp)

import spark.implicits._
val data = Seq(
  TestRecord("One", new Timestamp(System.currentTimeMillis()))
)

// Serialize:
val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
"com.example.domain")
val path = s"s3a://some-bucket/output/"
val ds = spark.createDataset(data)
ds.write
  .options(parameters)
  .mode(SaveMode.Overwrite)
  .avro(path)
//

// De-serialize
val output = spark.read.avro(path).as[TestRecord]
{code}

Output from the test:
{code:java}
scala> data.head
res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)

scala> output.collect().head
res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
{code}




> Spark De-serialization of Timestamp field is Incorrect
> ------------------------------------------------------
>
>                 Key: SPARK-22460
>                 URL: https://issues.apache.org/jira/browse/SPARK-22460
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.1.1
>            Reporter: Saniya Tech
>
> We are trying to serialize Timestamp fields to Avro using spark-avro 
> connector. I can see the Timestamp fields are getting correctly serialized as 
> long (milliseconds since Epoch). I verified that the data is correctly read 
> back from the Avro files. It is when we encode the Dataset as a case class 
> that timestamp field is incorrectly converted to a long value as seconds 
> since Epoch. As can be seen below, this shifts the timestamp many years in 
> the future.
> Code used to reproduce the issue:
> {code:java}
> import java.sql.Timestamp
> import com.databricks.spark.avro._
> import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
> case class TestRecord(name: String, modified: Timestamp)
> import spark.implicits._
> val data = Seq(
>   TestRecord("One", new Timestamp(System.currentTimeMillis()))
> )
> // Serialize:
> val parameters = Map("recordName" -> "TestRecord", "recordNamespace" -> 
> "com.example.domain")
> val path = s"s3a://some-bucket/output/"
> val ds = spark.createDataset(data)
> ds.write
>   .options(parameters)
>   .mode(SaveMode.Overwrite)
>   .avro(path)
> //
> // De-serialize
> val output = spark.read.avro(path).as[TestRecord]
> {code}
> Output from the test:
> {code:java}
> scala> data.head
> res4: TestRecord = TestRecord(One,2017-11-06 20:06:19.419)
> scala> output.collect().head
> res5: TestRecord = TestRecord(One,49819-12-16 17:23:39.0)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to