I've investigated a bit, and looks like it's not an issue of
mapGroupsWithState, but an issue of how UDT is handled in UnsafeRow. It
seems to miss handling UDT and the missing spot makes the internal code of
Spark corrupt the value. (So if I'm not mistaken, it's a correctness issue.)

I've filed an issue (sorry I missed you've already filed an issue) and
submitted a patch. https://issues.apache.org/jira/browse/SPARK-30993

It would be nice if you can try out my patch and see whether it fixes your
issue (I've already copied your code and made it pass, but would like to
double check). Thanks for reporting!

On Sat, Feb 29, 2020 at 11:26 AM Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

>
> Hi Tathagata.
>
> I tried making changes as you suggested:
>
> @SQLUserDefinedType(udt = classOf[JodaTimeUDT])
> class JodaTimeUDT extends UserDefinedType[DateTime] {
>   override def sqlType: DataType  = TimestampType
>
>   override def serialize(obj: DateTime): Long = {
>     obj.getMillis
>   }
>
>   def deserialize(datum: Any): DateTime = {
>     datum match {
>        case value: Long => new DateTime(value, DateTimeZone.UTC)
>     }
>   }
>
>   override def userClass: Class[DateTime] = classOf[DateTime]
>
>   private[spark] override def asNullable: JodaTimeUDT = this
> }
>
> object JodaTimeUDTRegister {
>   def register : Unit = {
>     UDTRegistration.register(classOf[DateTime].getName, 
> classOf[JodaTimeUDT].getName)
>   }
> }
>
>
> This did not resolve the problem.  The results remain the same:
>
>
> org.scalatest.exceptions.TestFailedException: 
> Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), 
> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
> elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), 
> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>
>
> I included a couple of other test cases to validate that the UDT works fine:
>
>
> "the joda time serializer" should "serialize and deserialize as expected" in {
>   val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>   val serializer = new JodaTimeUDT()
>   val serialized = serializer.serialize(input)
>   val deserialized = serializer.deserialize(serialized)
>
>   deserialized should be(input)
> }
>
> it should "correctly implement dataframe serialization & deserialization in 
> data frames" in {
>   val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC)
>   val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC)
>   val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3))
>   val sqlContext = session.sqlContext
>   import sqlContext.implicits._
>   val ds = input.toDF().as[FooWithDate]
>   val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, 
> Minutes(1)), x.s, x.i + 1)).collect()
>   val expected = List(FooWithDate(datePlusOne, "Foo", 2), 
> FooWithDate(datePlusOne, "Foo", 4))
>
>   result should contain theSameElementsAs expected
> }
>
>
> Any other thoughts?
>
>
> On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Sounds like something to do with the serialization/deserialization, and
>> not related to mapGroupsWithState.
>>
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
>>
>> The docs says that
>> 1. this is deprecated and therefore should not be used
>> 2. you have to use the annotation `SQLUserDefinedType
>> <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
>> on the class definition. You dont seem to have done it, maybe thats the
>> reason?
>>
>> I would debug by printing the values in the serialize/deserialize
>> methods, and then passing it through the groupBy that is known to fail.
>>
>> TD
>>
>> On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>
>>> Tathagata,
>>>
>>> The difference is more than hours off. In this instance it's different
>>> by 4 years. In other instances it's different by tens of years (and other
>>> smaller durations).
>>>
>>> We've considered moving to storage as longs, but this makes code much
>>> less readable and harder to maintain. The udt serialization bug also causes
>>> issues outside of stateful streaming, as when executing a simple group by.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> Get Outlook for Android <https://aka.ms/ghei36>
>>>
>>> ------------------------------
>>> *From:* Tathagata Das <tathagata.das1...@gmail.com>
>>> *Sent:* Friday, February 28, 2020 4:56:07 PM
>>> *To:* Bryan Jeffrey <bryan.jeff...@gmail.com>
>>> *Cc:* user <user@spark.apache.org>
>>> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT
>>> serialization does not work
>>>
>>> You are deserializing by explicitly specifying UTC timezone, but when
>>> serializing you are not specifying it. Maybe that is reason?
>>>
>>> Also, if you can encode it using just long, then I recommend just saving
>>> the value as long and eliminating some of the serialization overheads.
>>> Spark will probably better optimize stuff if it sees it as a long rather
>>> than an opaque UDT.
>>>
>>> TD
>>>
>>> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com>
>>> wrote:
>>>
>>> Hello.
>>>
>>> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
>>> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
>>> time in a number of data structures, and so we've generated a custom
>>> serializer for Joda.  This works well in most dataset/dataframe structured
>>> streaming operations. However, when running mapGroupsWithState we observed
>>> that incorrect dates were being returned from a state.
>>>
>>> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
>>> an effort to assist tracking of related information.
>>>
>>> Simple example:
>>> 1. Input A has a date D
>>> 2. Input A updates state in mapGroupsWithState. Date present in state is
>>> D
>>> 3. Input A is added again.  Input A has correct date D, but existing
>>> state now has invalid date
>>>
>>> Here is a simple repro:
>>>
>>> Joda Time UDT:
>>>
>>> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
>>>   override def sqlType: DataType  = LongType
>>>   override def serialize(obj: DateTime): Long = obj.getMillis
>>>   def deserialize(datum: Any): DateTime = datum match { case value: Long => 
>>> new DateTime(value, DateTimeZone.UTC) }
>>>   override def userClass: Class[DateTime] = classOf[DateTime]
>>>   private[spark] override def asNullable: JodaTimeUDT = this
>>> }
>>>
>>> object JodaTimeUDTRegister {
>>>   def register : Unit = { 
>>> UDTRegistration.register(classOf[DateTime].getName, 
>>> classOf[JodaTimeUDT].getName)  }
>>> }
>>>
>>>
>>> Test Leveraging Joda UDT:
>>>
>>> case class FooWithDate(date: DateTime, s: String, i: Int)
>>>
>>> @RunWith(classOf[JUnitRunner])
>>> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
>>> BeforeAndAfterAll {
>>>   val application = this.getClass.getName
>>>   var session: SparkSession = _
>>>
>>>   override def beforeAll(): Unit = {
>>>     System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
>>>     val sparkConf = new SparkConf()
>>>       .set("spark.driver.allowMultipleContexts", "true")
>>>       .set("spark.testing", "true")
>>>       .set("spark.memory.fraction", "1")
>>>       .set("spark.ui.enabled", "false")
>>>       .set("spark.streaming.gracefulStopTimeout", "1000")
>>>       .setAppName(application).setMaster("local[*]")
>>>
>>>
>>>     session = SparkSession.builder().config(sparkConf).getOrCreate()
>>>     session.sparkContext.setCheckpointDir("/")
>>>     JodaTimeUDTRegister.register
>>>   }
>>>
>>>   override def afterAll(): Unit = {
>>>     session.stop()
>>>   }
>>>
>>>   it should "work correctly for a streaming input with stateful 
>>> transformation" in {
>>>     val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
>>>     val sqlContext = session.sqlContext
>>>     import sqlContext.implicits._
>>>
>>>     val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 
>>> 3), FooWithDate(date, "Foo", 3))
>>>     val streamInput: MemoryStream[FooWithDate] = new 
>>> MemoryStream[FooWithDate](42, session.sqlContext)
>>>     streamInput.addData(input)
>>>     val ds: Dataset[FooWithDate] = streamInput.toDS()
>>>
>>>     val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
>>> GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
>>>     val result: Dataset[FooWithDate] = ds
>>>       .groupByKey(x => x.i)
>>>       
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
>>>     val writeTo = s"random_table_name"
>>>
>>>     
>>> result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
>>>     val combinedResults: Array[FooWithDate] = session.sql(sqlText = 
>>> s"select * from $writeTo").as[FooWithDate].collect()
>>>     val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
>>> "FooFoo", 6))
>>>     combinedResults should contain theSameElementsAs(expected)
>>>   }
>>> }
>>>
>>> object TestJodaTimeUdt {
>>>   def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: 
>>> GroupState[FooWithDate]): FooWithDate = {
>>>     if (state.hasTimedOut) {
>>>       state.remove()
>>>       state.getOption.get
>>>     } else {
>>>       val inputsSeq: Seq[FooWithDate] = inputs.toSeq
>>>       val startingState = state.getOption.getOrElse(inputsSeq.head)
>>>       val toProcess = if (state.getOption.isDefined) inputsSeq else 
>>> inputsSeq.tail
>>>       val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)
>>>
>>>       state.update(updatedFoo)
>>>       state.setTimeoutDuration("1 minute")
>>>       updatedFoo
>>>     }
>>>   }
>>>
>>>   def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = 
>>> FooWithDate(b.date, a.s + b.s, a.i + b.i)
>>> }
>>>
>>>
>>> The test output shows the invalid date:
>>>
>>> org.scalatest.exceptions.TestFailedException:
>>> Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1),
>>> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same
>>> elements as
>>> Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
>>> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>>>
>>> Is this something folks have encountered before?
>>>
>>> Thank you,
>>>
>>> Bryan Jeffrey
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to