*Yes, my code is shown below(I also post my code in another mail)*
    * input
  val logs = spark
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")

    * process
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")

    * output
  val query = results
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))


*and I use play json to parse input logs from kafka ,the parse function is

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)

and the java heap space is like (I've increase the executor memory to 15g):

[image: image.png]
