Hi Tathagata. I tried making changes as you suggested:
@SQLUserDefinedType(udt = classOf[JodaTimeUDT]) class JodaTimeUDT extends UserDefinedType[DateTime] { override def sqlType: DataType = TimestampType override def serialize(obj: DateTime): Long = { obj.getMillis } def deserialize(datum: Any): DateTime = { datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) } } override def userClass: Class[DateTime] = classOf[DateTime] private[spark] override def asNullable: JodaTimeUDT = this } object JodaTimeUDTRegister { def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) } } This did not resolve the problem. The results remain the same: org.scalatest.exceptions.TestFailedException: Array(FooWithDate(*2021-02-02T19:26:23.374Z*,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6)) I included a couple of other test cases to validate that the UDT works fine: "the joda time serializer" should "serialize and deserialize as expected" in { val input = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC) val serializer = new JodaTimeUDT() val serialized = serializer.serialize(input) val deserialized = serializer.deserialize(serialized) deserialized should be(input) } it should "correctly implement dataframe serialization & deserialization in data frames" in { val date = new DateTime(2020,1,2,3,4,5,6, DateTimeZone.UTC) val datePlusOne = new DateTime(2020,1,2,3,5,5,6, DateTimeZone.UTC) val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3)) val sqlContext = session.sqlContext import sqlContext.implicits._ val ds = input.toDF().as[FooWithDate] val result = ds.map(x => FooWithDate(DateUtils.addInterval(x.date, Minutes(1)), x.s, x.i + 1)).collect() val expected = List(FooWithDate(datePlusOne, "Foo", 2), FooWithDate(datePlusOne, "Foo", 4)) result should contain theSameElementsAs expected } Any other thoughts? On Fri, Feb 28, 2020 at 6:23 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > Sounds like something to do with the serialization/deserialization, and > not related to mapGroupsWithState. > > > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala > > The docs says that > 1. this is deprecated and therefore should not be used > 2. you have to use the annotation `SQLUserDefinedType > <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>` > on the class definition. You dont seem to have done it, maybe thats the > reason? > > I would debug by printing the values in the serialize/deserialize methods, > and then passing it through the groupBy that is known to fail. > > TD > > On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > >> Tathagata, >> >> The difference is more than hours off. In this instance it's different by >> 4 years. In other instances it's different by tens of years (and other >> smaller durations). >> >> We've considered moving to storage as longs, but this makes code much >> less readable and harder to maintain. The udt serialization bug also causes >> issues outside of stateful streaming, as when executing a simple group by. >> >> Regards, >> >> Bryan Jeffrey >> >> Get Outlook for Android <https://aka.ms/ghei36> >> >> ------------------------------ >> *From:* Tathagata Das <tathagata.das1...@gmail.com> >> *Sent:* Friday, February 28, 2020 4:56:07 PM >> *To:* Bryan Jeffrey <bryan.jeff...@gmail.com> >> *Cc:* user <user@spark.apache.org> >> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT >> serialization does not work >> >> You are deserializing by explicitly specifying UTC timezone, but when >> serializing you are not specifying it. Maybe that is reason? >> >> Also, if you can encode it using just long, then I recommend just saving >> the value as long and eliminating some of the serialization overheads. >> Spark will probably better optimize stuff if it sees it as a long rather >> than an opaque UDT. >> >> TD >> >> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com> >> wrote: >> >> Hello. >> >> I'm running Scala 2.11 w/ Spark 2.3.0. I've encountered a problem with >> mapGroupsWithState, and was wondering if anyone had insight. We use Joda >> time in a number of data structures, and so we've generated a custom >> serializer for Joda. This works well in most dataset/dataframe structured >> streaming operations. However, when running mapGroupsWithState we observed >> that incorrect dates were being returned from a state. >> >> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in >> an effort to assist tracking of related information. >> >> Simple example: >> 1. Input A has a date D >> 2. Input A updates state in mapGroupsWithState. Date present in state is D >> 3. Input A is added again. Input A has correct date D, but existing >> state now has invalid date >> >> Here is a simple repro: >> >> Joda Time UDT: >> >> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] { >> override def sqlType: DataType = LongType >> override def serialize(obj: DateTime): Long = obj.getMillis >> def deserialize(datum: Any): DateTime = datum match { case value: Long => >> new DateTime(value, DateTimeZone.UTC) } >> override def userClass: Class[DateTime] = classOf[DateTime] >> private[spark] override def asNullable: JodaTimeUDT = this >> } >> >> object JodaTimeUDTRegister { >> def register : Unit = { >> UDTRegistration.register(classOf[DateTime].getName, >> classOf[JodaTimeUDT].getName) } >> } >> >> >> Test Leveraging Joda UDT: >> >> case class FooWithDate(date: DateTime, s: String, i: Int) >> >> @RunWith(classOf[JUnitRunner]) >> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with >> BeforeAndAfterAll { >> val application = this.getClass.getName >> var session: SparkSession = _ >> >> override def beforeAll(): Unit = { >> System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath) >> val sparkConf = new SparkConf() >> .set("spark.driver.allowMultipleContexts", "true") >> .set("spark.testing", "true") >> .set("spark.memory.fraction", "1") >> .set("spark.ui.enabled", "false") >> .set("spark.streaming.gracefulStopTimeout", "1000") >> .setAppName(application).setMaster("local[*]") >> >> >> session = SparkSession.builder().config(sparkConf).getOrCreate() >> session.sparkContext.setCheckpointDir("/") >> JodaTimeUDTRegister.register >> } >> >> override def afterAll(): Unit = { >> session.stop() >> } >> >> it should "work correctly for a streaming input with stateful >> transformation" in { >> val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC) >> val sqlContext = session.sqlContext >> import sqlContext.implicits._ >> >> val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", >> 3), FooWithDate(date, "Foo", 3)) >> val streamInput: MemoryStream[FooWithDate] = new >> MemoryStream[FooWithDate](42, session.sqlContext) >> streamInput.addData(input) >> val ds: Dataset[FooWithDate] = streamInput.toDS() >> >> val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], >> GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState >> val result: Dataset[FooWithDate] = ds >> .groupByKey(x => x.i) >> >> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction) >> val writeTo = s"random_table_name" >> >> >> result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination() >> val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select >> * from $writeTo").as[FooWithDate].collect() >> val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, >> "FooFoo", 6)) >> combinedResults should contain theSameElementsAs(expected) >> } >> } >> >> object TestJodaTimeUdt { >> def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: >> GroupState[FooWithDate]): FooWithDate = { >> if (state.hasTimedOut) { >> state.remove() >> state.getOption.get >> } else { >> val inputsSeq: Seq[FooWithDate] = inputs.toSeq >> val startingState = state.getOption.getOrElse(inputsSeq.head) >> val toProcess = if (state.getOption.isDefined) inputsSeq else >> inputsSeq.tail >> val updatedFoo = toProcess.foldLeft(startingState)(concatFoo) >> >> state.update(updatedFoo) >> state.setTimeoutDuration("1 minute") >> updatedFoo >> } >> } >> >> def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = >> FooWithDate(b.date, a.s + b.s, a.i + b.i) >> } >> >> >> The test output shows the invalid date: >> >> org.scalatest.exceptions.TestFailedException: >> Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), >> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same >> elements as >> Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), >> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6)) >> >> Is this something folks have encountered before? >> >> Thank you, >> >> Bryan Jeffrey >> >> >> >> >> >> >>