Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak <boris.lit...@skf.com> wrote:

> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi <mailtojoshia...@gmail.com>
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user <user@spark.apache.org>
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:######################"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   ....
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>

Reply via email to