Hi, This is first time I am trying structured streaming with Kafka. I have simple code to read from Kafka and display it on the console. Message is in JSON format. However, when I run my code nothin after below line gets printed.
17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query. 17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092 (id: 2147483647 <(214)%20748-3647> rack: null) for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0. 17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092 (id: 2147483647 <(214)%20748-3647> rack: null) dead for group spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0 Code is - Dataset<Row> kafkaStream = spark.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getString("kafka.host") + ":" + config.getString("kafka.port")) .option("subscribe", "test") .load(); //kafkaStream.printSchema(); //JSON ::: {"id":1,"name":"MySelf"} StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("id", DataTypes.IntegerType, false), DataTypes.createStructField("name", DataTypes.StringType, false)}); Dataset<KafkaMessage> streamingSelectDF = kafkaStream.selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"), schema).as("json")) .select("json.*") .as(Encoders.bean(KafkaMessage.class)); streamingSelectDF.createOrReplaceTempView("MyView"); Dataset<Row> streamData = spark.sql("SELECT count(*) from MyView"); StreamingQuery streamingQuery = streamData.writeStream() .format("console") .outputMode("complete") .trigger(Trigger.ProcessingTime("10 seconds")).start(); try { streamingQuery.awaitTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } Regards, Leena