Hi Boris, I need to do processing on the data present in the path. That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution? Regards Amit On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak <boris.lit...@skf.com> wrote: > Hi Amit, > > > > Why won’t you just map()/mapXXX() the kafkaDf with the mapping function > that reads the paths? > > Also, do you really have to read the json into an additional dataframe? > > > > Thanks, Boris > > > > *From:* Amit Joshi <mailtojoshia...@gmail.com> > *Sent:* Monday, 18 January 2021 15:04 > *To:* spark-user <user@spark.apache.org> > *Subject:* [Spark Structured Streaming] Processing the data path coming > from kafka. > > > > Hi , > > > > I have a use case where the file path of the json records stored in s3 are > coming as a kafka > > message in kafka. I have to process the data using spark structured > streaming. > > > > The design which I thought is as follows: > > 1. In kafka Spark structures streaming, read the message containing the > data path. > > 2. Collect the message record in driver. (Messages are small in sizes) > > 3. Create the dataframe from the datalocation. > > > > *kafkaDf*.select(*$"value"*.cast(StringType)) > .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => { > > //rough code > > //collec to driver > > *val *records = batchDf.collect() > > //create dataframe and process > records foreach((rec: Row) =>{ > *println*(*"records:######################"*,rec.toString()) > val path = rec.getAs[String](*"data_path"*) > > val dfToProcess =spark.read.json(path) > > .... > > }) > > } > > I would like to know the views, if this approach is fine? Specifically if > there is some problem with > > with creating the dataframe after calling collect. > > If there is any better approach, please let know the same. > > > > Regards > > Amit Joshi > >