I am trying to test the water mark concept in structured streaming using the
below program

 import java.sql.Timestamp
    import org.apache.spark.sql.functions.{col, expr}
    import org.apache.spark.sql.streaming.Trigger

    val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm1:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      select('value.cast("String") as "key",
('value.cast("String")).cast("long").cast
      ("timestamp") as "timeStampValue").
      select("key", "timeStampValue").
      withWatermark("timeStampValue", "10 seconds ")
    
    val query = lines_stream.
      writeStream.
      option("truncate", "false").
      outputMode("append").
      format("console").
      trigger(Trigger.ProcessingTime(3000)).
      start()
    query.awaitTermination()


//Corresponding output

scala>     query.awaitTermination()
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---+--------------+
|key|timeStampValue|
+---+--------------+
+---+--------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844822|2018-10-18 14:40:22|
+----------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844842|2018-10-18 14:40:42|
+----------+-------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844862|2018-10-18 14:41:02|
+----------+-------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844882|2018-10-18 14:41:22|
+----------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844852|2018-10-18 14:40:52|* // As per watermark this event should be
discarded but it didnt*
+----------+-------------------+

Note:Below are the values I sent from kafka-producer
1539844822
1539844842
1539844862
1539844882
1539844852

Is this correct way to test the water mark scenarios ?

Regards
Sandeep Katta





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to