hi Timo:

The LAST_VALUE function simply groups by id and then takes the latest row of
data for each primary key. I was inspired by this answer:
https://stackoverflow.com/questions/48554999/apache-flink-how-to-enable-upsert-mode-for-dynamic-tables

Its implementation is also very simple:

class Middle2 extends Serializable{
  private val serialVersionUID = 3L
  var mid:String = "none"
}

class StringLastValueFunc extends AggregateFunction[JString, Middle2] {

  override def createAccumulator(): Middle2 = {
    new Middle2
  }

  def accumulate(acc: Middle2, iValue: String): Unit = {
    if(iValue != null && iValue.toString != ""){
      acc.mid = iValue
    }
  }

  override def getValue(acc: Middle2): JString = {
    acc.mid
  }

  override def getResultType: TypeInformation[JString] = Types.STRING
}


and I don't think I should set the state expiration time because the data
for each primary key changes at any time.

I have used the rocksdb backend and set the incremental checkpoint.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to