Hi Dawid, I'm working with Max on the project
Our code for the TimestampAndWatermarkAssigner is:
```
class TimestampAndWatermarkAssigner(val maxLateness: Long) extends
AssignerWithPeriodicWatermarks[Row] {

  override def extractTimestamp(element: Row, previousElementTimestamp:
Long): Long = {
    element.minTime
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis() - maxLateness)
  }
}
```

Where Row is a class representing the incoming JSON object coming from
Kafka, which includes the timestamp

Thanks,
-Ethan



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-with-Event-Time-and-Kafka-tp12061p12090.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to