I have written this simple code to try streaming aggregation in spark 2.4. 
Somehow, job keeps running but not returning any result. It returns me 3 
columns JobType, Timestamp and TS if I remove groupby and count aggregation 
function.

Would really appreciate any help.

       val edgeDF = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "...")
        .option("subscribe", "edgemw")
        .load()

       val edgeSelectDF = 
edgeDF.select(get_json_object(($"value").cast("string"),"$.after.JOBTYPE").alias("JobType"),
                                       
get_json_object(($"value").cast("string"),"$.current_ts").alias("Timestamp"))
                               
.select($"JobType",$"Timestamp",from_utc_timestamp($"Timestamp", 
"UTC").alias("TS"))
                               .groupBy("JobType")
                               .count()


       val query = edgeSelectDF
              .writeStream
              .format("console")
              .outputMode(OutputMode.Complete())
              .trigger(Trigger.ProcessingTime(5.second))
              .start()
              .awaitTermination()

Thanks & regards,
Ritesh
============================================================================================================================

Disclaimer:  This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html 
<http://www.techmahindra.com/Disclaimer.html> externally 
http://tim.techmahindra.com/tim/disclaimer.html 
<http://tim.techmahindra.com/tim/disclaimer.html> internally within 
TechMahindra.

============================================================================================================================

Reply via email to