Hi Tathagata.

I tried making changes as you suggested:

@SQLUserDefinedType(udt = classOf[JodaTimeUDT])
class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = TimestampType

  override def serialize(obj: DateTime): Long = {
    obj.getMillis
  }

  def deserialize(datum: Any): DateTime = {
    datum match {
       case value: Long => new DateTime(value, DateTimeZone.UTC)
    }
  }

  override def userClass: Class[DateTime] = classOf[DateTime]

  private[spark] override def asNullable: JodaTimeUDT = this
}

object JodaTimeUDTRegister {
  def register : Unit = {
    UDTRegistration.register(classOf[DateTime].getName,
classOf[JodaTimeUDT].getName)
  }
}


This did not resolve the problem.  The results remain the same:


org.scalatest.exceptions.TestFailedException:
Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1),
FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the
same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))


I included a couple of other test cases to validate that the UDT works fine:


"the joda time serializer" should "serialize and deserialize as expected" in {
  val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val serializer = new JodaTimeUDT()
  val serialized = serializer.serialize(input)
  val deserialized = serializer.deserialize(serialized)

  deserialized should be(input)
}

it should "correctly implement dataframe serialization &
deserialization in data frames" in {
  val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
  val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
  val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
  val sqlContext = session.sqlContext
  import sqlContext.implicits._
  val ds = input.toDF().as[FooWithDate]
  val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date,
Minutes(1)), x.s, x.i + 1)).collect()
  val expected = List(FooWithDate(datePlusOne, "Foo", 2),
FooWithDate(datePlusOne, "Foo", 4))

  result should contain theSameElementsAs expected
}


Any other thoughts?


On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Sounds like something to do with the serialization/deserialization, and
> not related to mapGroupsWithState.
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>
> The docs says that
> 1. this is deprecated and therefore should not be used
> 2. you have to use the annotation `SQLUserDefinedType
> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
> on the class definition. You dont seem to have done it, maybe thats the
> reason?
>
> I would debug by printing the values in the serialize/deserialize methods,
> and then passing it through the groupBy that is known to fail.
>
> TD
>
> On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Tathagata,
>>
>> The difference is more than hours off. In this instance it's different by
>> 4 years. In other instances it's different by tens of years (and other
>> smaller durations).
>>
>> We've considered moving to storage as longs, but this makes code much
>> less readable and harder to maintain. The udt serialization bug also causes
>> issues outside of stateful streaming, as when executing a simple group by.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> ------------------------------
>> *From:* Tathagata Das <tathagata.das1...@gmail.com>
>> *Sent:* Friday, February 28, 2020 4:56:07 PM
>> *To:* Bryan Jeffrey <bryan.jeff...@gmail.com>
>> *Cc:* user <user@spark.apache.org>
>> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT
>> serialization does not work
>>
>> You are deserializing by explicitly specifying UTC timezone, but when
>> serializing you are not specifying it. Maybe that is reason?
>>
>> Also, if you can encode it using just long, then I recommend just saving
>> the value as long and eliminating some of the serialization overheads.
>> Spark will probably better optimize stuff if it sees it as a long rather
>> than an opaque UDT.
>>
>> TD
>>
>> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>
>> Hello.
>>
>> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
>> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
>> time in a number of data structures, and so we've generated a custom
>> serializer for Joda.  This works well in most dataset/dataframe structured
>> streaming operations. However, when running mapGroupsWithState we observed
>> that incorrect dates were being returned from a state.
>>
>> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
>> an effort to assist tracking of related information.
>>
>> Simple example:
>> 1. Input A has a date D
>> 2. Input A updates state in mapGroupsWithState. Date present in state is D
>> 3. Input A is added again.  Input A has correct date D, but existing
>> state now has invalid date
>>
>> Here is a simple repro:
>>
>> Joda Time UDT:
>>
>> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
>>   override def sqlType: DataType  = LongType
>>   override def serialize(obj: DateTime): Long = obj.getMillis
>>   def deserialize(datum: Any): DateTime = datum match { case value: Long => 
>> new DateTime(value, DateTimeZone.UTC) }
>>   override def userClass: Class[DateTime] = classOf[DateTime]
>>   private[spark] override def asNullable: JodaTimeUDT = this
>> }
>>
>> object JodaTimeUDTRegister {
>>   def register : Unit = { 
>> UDTRegistration.register(classOf[DateTime].getName, 
>> classOf[JodaTimeUDT].getName)  }
>> }
>>
>>
>> Test Leveraging Joda UDT:
>>
>> case class FooWithDate(date: DateTime, s: String, i: Int)
>>
>> @RunWith(classOf[JUnitRunner])
>> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
>> BeforeAndAfterAll {
>>   val application = this.getClass.getName
>>   var session: SparkSession = _
>>
>>   override def beforeAll(): Unit = {
>>     System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
>>     val sparkConf = new SparkConf()
>>       .set("spark.driver.allowMultipleContexts", "true")
>>       .set("spark.testing", "true")
>>       .set("spark.memory.fraction", "1")
>>       .set("spark.ui.enabled", "false")
>>       .set("spark.streaming.gracefulStopTimeout", "1000")
>>       .setAppName(application).setMaster("local[*]")
>>
>>
>>     session = SparkSession.builder().config(sparkConf).getOrCreate()
>>     session.sparkContext.setCheckpointDir("/")
>>     JodaTimeUDTRegister.register
>>   }
>>
>>   override def afterAll(): Unit = {
>>     session.stop()
>>   }
>>
>>   it should "work correctly for a streaming input with stateful 
>> transformation" in {
>>     val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
>>     val sqlContext = session.sqlContext
>>     import sqlContext.implicits._
>>
>>     val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 
>> 3), FooWithDate(date, "Foo", 3))
>>     val streamInput: MemoryStream[FooWithDate] = new 
>> MemoryStream[FooWithDate](42, session.sqlContext)
>>     streamInput.addData(input)
>>     val ds: Dataset[FooWithDate] = streamInput.toDS()
>>
>>     val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
>> GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
>>     val result: Dataset[FooWithDate] = ds
>>       .groupByKey(x => x.i)
>>       
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
>>     val writeTo = s"random_table_name"
>>
>>     
>> result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
>>     val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select 
>> * from $writeTo").as[FooWithDate].collect()
>>     val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
>> "FooFoo", 6))
>>     combinedResults should contain theSameElementsAs(expected)
>>   }
>> }
>>
>> object TestJodaTimeUdt {
>>   def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: 
>> GroupState[FooWithDate]): FooWithDate = {
>>     if (state.hasTimedOut) {
>>       state.remove()
>>       state.getOption.get
>>     } else {
>>       val inputsSeq: Seq[FooWithDate] = inputs.toSeq
>>       val startingState = state.getOption.getOrElse(inputsSeq.head)
>>       val toProcess = if (state.getOption.isDefined) inputsSeq else 
>> inputsSeq.tail
>>       val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)
>>
>>       state.update(updatedFoo)
>>       state.setTimeoutDuration("1 minute")
>>       updatedFoo
>>     }
>>   }
>>
>>   def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = 
>> FooWithDate(b.date, a.s + b.s, a.i + b.i)
>> }
>>
>>
>> The test output shows the invalid date:
>>
>> org.scalatest.exceptions.TestFailedException:
>> Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1),
>> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same
>> elements as
>> Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
>> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>>
>> Is this something folks have encountered before?
>>
>> Thank you,
>>
>> Bryan Jeffrey
>>
>>
>>
>>
>>
>>
>>

Reply via email to