Hi,

I am using structured streaming for ETL.


val data_stream = spark
  .readStream // constantly expanding dataframe
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sms_history")
  .option("startingOffsets", "earliest") // begin from start of topic
  .option("failOnDataLoss", "false")
  .load()

I transform this into a DataSet with following schema.

root
 |-- accountId: long (nullable = true)
 |-- countryId: long (nullable = true)
 |-- credits: double (nullable = true)
 |-- deliveryStatus: string (nullable = true)
 |-- senderId: string (nullable = true)
 |-- sentStatus: string (nullable = true)
 |-- source: integer (nullable = true)
 |-- createdOn: timestamp (nullable = true)
 |-- send_success_credits: double (nullable = true)
 |-- send_error_credits: double (nullable = true)
 |-- delivered_credits: double (nullable = true)
 |-- invalid_sd_credits: double (nullable = true)
 |-- undelivered_credits: double (nullable = true)
 |-- unknown_credits: double (nullable = true)


Now I want to write this transformed stream to another Kafka topic. I have 
temporarily used a UDF that accepts all these columns as parameters and create 
a json string for adding a column "value" for writing to Kafka.

Is there easier and cleaner way to do the same?


Thanks,
Pankaj

Reply via email to