Perfect. I'll give this a shot and report back. Get Outlook for Android<https://aka.ms/ghei36>
________________________________ From: Tathagata Das <tathagata.das1...@gmail.com> Sent: Friday, February 28, 2020 6:23:07 PM To: Bryan Jeffrey <bryan.jeff...@gmail.com> Cc: user <user@spark.apache.org> Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala The docs says that 1. this is deprecated and therefore should not be used 2. you have to use the annotation `SQLUserDefinedType<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>` on the class definition. You dont seem to have done it, maybe thats the reason? I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. TD On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>> wrote: Tathagata, The difference is more than hours off. In this instance it's different by 4 years. In other instances it's different by tens of years (and other smaller durations). We've considered moving to storage as longs, but this makes code much less readable and harder to maintain. The udt serialization bug also causes issues outside of stateful streaming, as when executing a simple group by. Regards, Bryan Jeffrey Get Outlook for Android<https://aka.ms/ghei36> ________________________________ From: Tathagata Das <tathagata.das1...@gmail.com<mailto:tathagata.das1...@gmail.com>> Sent: Friday, February 28, 2020 4:56:07 PM To: Bryan Jeffrey <bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does not work You are deserializing by explicitly specifying UTC timezone, but when serializing you are not specifying it. Maybe that is reason? Also, if you can encode it using just long, then I recommend just saving the value as long and eliminating some of the serialization overheads. Spark will probably better optimize stuff if it sees it as a long rather than an opaque UDT. TD On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com<mailto:bryan.jeff...@gmail.com>> wrote: Hello. I'm running Scala 2.11 w/ Spark 2.3.0. I've encountered a problem with mapGroupsWithState, and was wondering if anyone had insight. We use Joda time in a number of data structures, and so we've generated a custom serializer for Joda. This works well in most dataset/dataframe structured streaming operations. However, when running mapGroupsWithState we observed that incorrect dates were being returned from a state. I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in an effort to assist tracking of related information. Simple example: 1. Input A has a date D 2. Input A updates state in mapGroupsWithState. Date present in state is D 3. Input A is added again. Input A has correct date D, but existing state now has invalid date Here is a simple repro: Joda Time UDT: private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] { override def sqlType: DataType = LongType override def serialize(obj: DateTime): Long = obj.getMillis def deserialize(datum: Any): DateTime = datum match { case value: Long => new DateTime(value, DateTimeZone.UTC) } override def userClass: Class[DateTime] = classOf[DateTime] private[spark] override def asNullable: JodaTimeUDT = this } object JodaTimeUDTRegister { def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, classOf[JodaTimeUDT].getName) } } Test Leveraging Joda UDT: case class FooWithDate(date: DateTime, s: String, i: Int) @RunWith(classOf[JUnitRunner]) class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with BeforeAndAfterAll { val application = this.getClass.getName var session: SparkSession = _ override def beforeAll(): Unit = { System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath) val sparkConf = new SparkConf() .set("spark.driver.allowMultipleContexts", "true") .set("spark.testing", "true") .set("spark.memory.fraction", "1") .set("spark.ui.enabled", "false") .set("spark.streaming.gracefulStopTimeout", "1000") .setAppName(application).setMaster("local[*]") session = SparkSession.builder().config(sparkConf).getOrCreate() session.sparkContext.setCheckpointDir("/") JodaTimeUDTRegister.register } override def afterAll(): Unit = { session.stop() } it should "work correctly for a streaming input with stateful transformation" in { val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC) val sqlContext = session.sqlContext import sqlContext.implicits._ val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), FooWithDate(date, "Foo", 3)) val streamInput: MemoryStream[FooWithDate] = new MemoryStream[FooWithDate](42, session.sqlContext) streamInput.addData(input) val ds: Dataset[FooWithDate] = streamInput.toDS() val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState val result: Dataset[FooWithDate] = ds .groupByKey(x => x.i) .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction) val writeTo = s"random_table_name" result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination() val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * from $writeTo").as[FooWithDate].collect() val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, "FooFoo", 6)) combinedResults should contain theSameElementsAs(expected) } } object TestJodaTimeUdt { def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: GroupState[FooWithDate]): FooWithDate = { if (state.hasTimedOut) { state.remove() state.getOption.get } else { val inputsSeq: Seq[FooWithDate] = inputs.toSeq val startingState = state.getOption.getOrElse(inputsSeq.head) val toProcess = if (state.getOption.isDefined) inputsSeq else inputsSeq.tail val updatedFoo = toProcess.foldLeft(startingState)(concatFoo) state.update(updatedFoo) state.setTimeoutDuration("1 minute") updatedFoo } } def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = FooWithDate(b.date, a.s + b.s, a.i + b.i) } The test output shows the invalid date: org.scalatest.exceptions.TestFailedException: Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same elements as Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6)) Is this something folks have encountered before? Thank you, Bryan Jeffrey