Perfect. I'll give this a shot and report back.

Get Outlook for Android<>

From: Tathagata Das <>
Sent: Friday, February 28, 2020 6:23:07 PM
To: Bryan Jeffrey <>
Cc: user <>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

Sounds like something to do with the serialization/deserialization, and not 
related to mapGroupsWithState.

The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation 
 on the class definition. You dont seem to have done it, maybe thats the reason?

I would debug by printing the values in the serialize/deserialize methods, and 
then passing it through the groupBy that is known to fail.


On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey 
<<>> wrote:

The difference is more than hours off. In this instance it's different by 4 
years. In other instances it's different by tens of years (and other smaller 

We've considered moving to storage as longs, but this makes code much less 
readable and harder to maintain. The udt serialization bug also causes issues 
outside of stateful streaming, as when executing a simple group by.


Bryan Jeffrey

Get Outlook for Android<>

From: Tathagata Das 
Sent: Friday, February 28, 2020 4:56:07 PM
To: Bryan Jeffrey <<>>
Cc: user <<>>
Subject: Re: Structured Streaming: mapGroupsWithState UDT serialization does 
not work

You are deserializing by explicitly specifying UTC timezone, but when 
serializing you are not specifying it. Maybe that is reason?

Also, if you can encode it using just long, then I recommend just saving the 
value as long and eliminating some of the serialization overheads. Spark will 
probably better optimize stuff if it sees it as a long rather than an opaque 


On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey 
<<>> wrote:

I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with 
mapGroupsWithState, and was wondering if anyone had insight.  We use Joda time 
in a number of data structures, and so we've generated a custom serializer for 
Joda.  This works well in most dataset/dataframe structured streaming 
operations. However, when running mapGroupsWithState we observed that incorrect 
dates were being returned from a state.

I created a bug here: in an 
effort to assist tracking of related information.

Simple example:
1. Input A has a date D
2. Input A updates state in mapGroupsWithState. Date present in state is D
3. Input A is added again.  Input A has correct date D, but existing state now 
has invalid date

Here is a simple repro:

Joda Time UDT:

private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
  override def sqlType: DataType  = LongType
  override def serialize(obj: DateTime): Long = obj.getMillis
  def deserialize(datum: Any): DateTime = datum match { case value: Long => new 
DateTime(value, DateTimeZone.UTC) }
  override def userClass: Class[DateTime] = classOf[DateTime]
  private[spark] override def asNullable: JodaTimeUDT = this

object JodaTimeUDTRegister {
  def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
classOf[JodaTimeUDT].getName)  }

Test Leveraging Joda UDT:

case class FooWithDate(date: DateTime, s: String, i: Int)

class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
BeforeAndAfterAll {
  val application = this.getClass.getName
  var session: SparkSession = _

  override def beforeAll(): Unit = {
    System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
    val sparkConf = new SparkConf()
      .set("spark.driver.allowMultipleContexts", "true")
      .set("spark.testing", "true")
      .set("spark.memory.fraction", "1")
      .set("spark.ui.enabled", "false")
      .set("spark.streaming.gracefulStopTimeout", "1000")

    session = SparkSession.builder().config(sparkConf).getOrCreate()

  override def afterAll(): Unit = {

  it should "work correctly for a streaming input with stateful transformation" 
in {
    val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
    val sqlContext = session.sqlContext
    import sqlContext.implicits._

    val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 3), 
FooWithDate(date, "Foo", 3))
    val streamInput: MemoryStream[FooWithDate] = new 
MemoryStream[FooWithDate](42, session.sqlContext)
    val ds: Dataset[FooWithDate] = streamInput.toDS()

    val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
    val result: Dataset[FooWithDate] = ds
      .groupByKey(x => x.i)
    val writeTo = s"random_table_name"

    val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select * 
from $writeTo").as[FooWithDate].collect()
    val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
"FooFoo", 6))
    combinedResults should contain theSameElementsAs(expected)

object TestJodaTimeUdt {
  def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: 
GroupState[FooWithDate]): FooWithDate = {
    if (state.hasTimedOut) {
    } else {
      val inputsSeq: Seq[FooWithDate] = inputs.toSeq
      val startingState = state.getOption.getOrElse(inputsSeq.head)
      val toProcess = if (state.getOption.isDefined) inputsSeq else 
      val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)

      state.setTimeoutDuration("1 minute")

  def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = 
FooWithDate(, a.s + b.s, a.i + b.i)

The test output shows the invalid date:

FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same 
elements as

Is this something folks have encountered before?

Thank you,

Bryan Jeffrey

Reply via email to