Hi, I’m obviously new to Spark Structured Streaming, and I want to

1.) Open one (a single) Connection to a Mqtt broker / topic spewing JSON Objects
2.) Transform JSON to Wide Table
3.) Do several different queries on wide Table

What I do: 

val lines = session.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "t")
  .option("QoS", 1)
  .option("username", "u")
  .option("password", p")
  .option("persistence", "memory")
  .load("ssl://mqtt ... :8883").as[(String, Timestamp)]

// 1st Sink
val parsedData = lines.select(from_json('value, schema) 
val parsedDataQuery = parsedData.writeStream
  .outputMode("append")
  .format("console")
  .start()  // <-------------- this opens one connection to Mqtt Broker

// 2nd Sink
val countPerMinute = parsedData
  .groupBy(window($"timestamp", "1 minute"))
  .count()
val query = countPerMinute.writeStream
  .outputMode("complete")
  .format("console")
  .start()  // <---------------- this opens another connection to Mqtt Broker

parsedDataQuery.awaitTermination()

How to only have one connection to the Mqtt Server and do multiple analyses on 
the 'parsedData'?

Thanks for your advice in advance,
Jürgen

Reply via email to