Hi Swetha,
I also had the same requirement reading from json from kafka and writing
back to parquet format.
I did a work around :

   1. Inferred the schema using the batch api by reading first few rows
   2. started streaming using the inferred schema in step1

*Limitation*: Will not work if you schema changes on the go for later
records. Will have to restart the streaming.


*Sample Code:*
 //start the stream
    def start = {
        //check and get latest kafka offset from checkpoint if exists
        val startingOffset:String= getKafkaOffset(offsetDirHdfsPath)

        //batch: infer schema from kafka one time during start
        val batchDf=
spark.read.format("kafka").option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topic)
            //.option("startingOffsets",
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest
            .option("startingOffsets",startingOffset)
           .load().limit(2).select($"value".as[String])
        val batchJson= spark.read.json(batchDf)
        batchJson.printSchema()//print to see schema
        val schema = batchJson.schema

        //streaming: create datastream from Kafka topics
        val inputDf = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", brokers)
            .option("subscribe", topic)
            // .option("startingOffsets",
"""{"tweets":{"0":600,"1":-2,"2":-2,"3":-2}}""") //-1:latest , -2:earliest
            .option("startingOffsets",startingOffset)
            .load()

        // convert datastream into a datasets and convert the stream into
multiple rows by applying appropriate schema
        val ds= inputDf.selectExpr("CAST (value as STRING)")
        val dataSet =
ds.select(from_json($"value",schema).as("data")).select("data.*")
        var uploadToS3 = dataSet
            .writeStream
            .format("parquet")
            .option("path", outputPath)
            .option("checkpointLocation", checkpointDir)
            .start()
    }

Regards,
Chandan

On Thu, Jul 5, 2018 at 12:38 PM SRK <swethakasire...@gmail.com> wrote:

> Hi,
>
> Is there a way that Automatic Json Schema inference can be done using
> Structured Streaming?  I do not want to supply a predefined schema and bind
> it.
>
> With Spark Kafka Direct I could do spark.read.json(). I see that this is
> not
> supported in Structured Streaming.
>
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash

Reply via email to