Hi Pankaj,

What version of Spark are you using?

If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json

Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane <pankajwah...@live.com> wrote:

> Hi,
>
> I am using structured streaming for ETL.
>
> val data_stream = spark
>   .readStream // constantly expanding dataframe
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "sms_history")
>   .option("startingOffsets", "earliest") // begin from start of topic
>   .option("failOnDataLoss", "false")
>   .load()
>
> I transform this into a DataSet with following schema.
>
> root
>  |-- accountId: long (nullable = true)
>  |-- countryId: long (nullable = true)
>  |-- credits: double (nullable = true)
>  |-- deliveryStatus: string (nullable = true)
>  |-- senderId: string (nullable = true)
>  |-- sentStatus: string (nullable = true)
>  |-- source: integer (nullable = true)
>  |-- createdOn: timestamp (nullable = true)
>  |-- send_success_credits: double (nullable = true)
>  |-- send_error_credits: double (nullable = true)
>  |-- delivered_credits: double (nullable = true)
>  |-- invalid_sd_credits: double (nullable = true)
>  |-- undelivered_credits: double (nullable = true)
>  |-- unknown_credits: double (nullable = true)
>
>
> Now I want to write this transformed stream to another Kafka topic. I have
> temporarily used a UDF that accepts all these columns as parameters and
> create a json string for adding a column "value" for writing to Kafka.
>
> Is there easier and cleaner way to do the same?
>
>
> Thanks,
> Pankaj
>
>

Reply via email to