Hi Swetha, I also had the same requirement reading from json from kafka and writing back to parquet format. I did a work around :
1. Inferred the schema using the batch api by reading first few rows 2. started streaming using the inferred schema in step1 *Limitation*: Will not work if you schema changes on the go for later records. Will have to restart the streaming. *Sample Code:* //start the stream def start = { //check and get latest kafka offset from checkpoint if exists val startingOffset:String= getKafkaOffset(offsetDirHdfsPath) //batch: infer schema from kafka one time during start val batchDf= spark.read.format("kafka").option("kafka.bootstrap.servers", brokers) .option("subscribe", topic) //.option("startingOffsets", """{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest .option("startingOffsets",startingOffset) .load().limit(2).select($"value".as[String]) val batchJson= spark.read.json(batchDf) batchJson.printSchema()//print to see schema val schema = batchJson.schema //streaming: create datastream from Kafka topics val inputDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topic) // .option("startingOffsets", """{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest .option("startingOffsets",startingOffset) .load() // convert datastream into a datasets and convert the stream into multiple rows by applying appropriate schema val ds= inputDf.selectExpr("CAST (value as STRING)") val dataSet = ds.select(from_json($"value",schema).as("data")).select("data.*") var uploadToS3 = dataSet .writeStream .format("parquet") .option("path", outputPath) .option("checkpointLocation", checkpointDir) .start() } Regards, Chandan On Thu, Jul 5, 2018 at 12:38 PM SRK <swethakasire...@gmail.com> wrote: > Hi, > > Is there a way that Automatic Json Schema inference can be done using > Structured Streaming? I do not want to supply a predefined schema and bind > it. > > With Spark Kafka Direct I could do spark.read.json(). I see that this is > not > supported in Structured Streaming. > > > Thanks! > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Chandan Prakash