I am reading some data from kafka, and willing to save them to parquet on hdfs with structured streaming. The data from kafka is in JSON format. I try to convert them to DataSet<Row> with spark.read.json(). However, I get the exception: > > Queries with streaming sources must be executed with > writeStream.start()
Here is my code: > > Dataset<Row> df = spark.readStream().format("kafka")... > Dataset<String> jsonDataset = df.selectExpr("CAST(value AS STRING)").map... > Dataset<Row> rowDataset = spark.read().json(jsonDataset); > > rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination(); How to solve it? Thanks! Regard, Junfeng Chen