Hi,
This is first time I am trying structured streaming with Kafka. I have
simple code to read from Kafka and display it on the console. Message is in
JSON format. However, when I run my code nothin after below line gets
printed.

17/07/21 13:43:41 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/07/21 13:43:41 INFO StreamExecution: Starting new streaming query.
17/07/21 13:43:42 INFO AbstractCoordinator: Discovered coordinator XXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0.
17/07/21 13:43:42 INFO AbstractCoordinator: Marking the coordinatorXXX:9092
(id: 2147483647 <(214)%20748-3647> rack: null) dead for group
spark-kafka-source-085b8fda-7c01-435d-99db-b67a94dafa3f-1814340906-driver-0


Code is -

       Dataset<Row> kafkaStream = spark.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",
config.getString("kafka.host") + ":" + config.getString("kafka.port"))
                .option("subscribe", "test")
                .load();
        //kafkaStream.printSchema();
        //JSON ::: {"id":1,"name":"MySelf"}
        StructType schema = DataTypes.createStructType(new StructField[] {
                DataTypes.createStructField("id", DataTypes.IntegerType, false),
                DataTypes.createStructField("name",
DataTypes.StringType, false)});

        Dataset<KafkaMessage> streamingSelectDF =
kafkaStream.selectExpr("CAST(value AS STRING) as message")
                .select(functions.from_json(functions.col("message"),
schema).as("json"))
                .select("json.*")
                .as(Encoders.bean(KafkaMessage.class));
        streamingSelectDF.createOrReplaceTempView("MyView");
        Dataset<Row> streamData = spark.sql("SELECT count(*) from MyView");


        StreamingQuery streamingQuery = streamData.writeStream()
                .format("console")
                .outputMode("complete")
                .trigger(Trigger.ProcessingTime("10 seconds")).start();
        try {
            streamingQuery.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }


Regards,

Leena

Reply via email to