I am reading some data from kafka, and willing to save them to parquet on
hdfs with structured streaming.
The data from kafka is in JSON format. I try to convert them to
DataSet<Row> with spark.read.json(). However, I get the exception:
> Queries with streaming sources must be executed with
> writeStream.start()

Here is my code:
> Dataset<Row> df = spark.readStream().format("kafka")...
> Dataset<String> jsonDataset = df.selectExpr("CAST(value AS STRING)").map...
> Dataset<Row> rowDataset = spark.read().json(jsonDataset);
> rowDataset.writeStream().outputMode(OutputMode.Append()).partitionBy("appname").format("parquet").option("path",savePath).start().awaitTermination();

How to solve it?


Junfeng Chen

