Hi, I’m obviously new to Spark Structured Streaming, and I want to 1.) Open one (a single) Connection to a Mqtt broker / topic spewing JSON Objects 2.) Transform JSON to Wide Table 3.) Do several different queries on wide Table
What I do: val lines = session.readStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") .option("topic", "t") .option("QoS", 1) .option("username", "u") .option("password", p") .option("persistence", "memory") .load("ssl://mqtt ... :8883").as[(String, Timestamp)] // 1st Sink val parsedData = lines.select(from_json('value, schema) val parsedDataQuery = parsedData.writeStream .outputMode("append") .format("console") .start() // <-------------- this opens one connection to Mqtt Broker // 2nd Sink val countPerMinute = parsedData .groupBy(window($"timestamp", "1 minute")) .count() val query = countPerMinute.writeStream .outputMode("complete") .format("console") .start() // <---------------- this opens another connection to Mqtt Broker parsedDataQuery.awaitTermination() How to only have one connection to the Mqtt Server and do multiple analyses on the 'parsedData'? Thanks for your advice in advance, Jürgen