Hello All,
I've a pyspark dataframe which i need to write to Kafka topic.

Structure of the DF is :

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = false)
 |    |-- end: timestamp (nullable = false)
 |-- processedAlarmCnt: integer (nullable = false)
 |-- totalAlarmCnt: integer (nullable = false)

Currently, i'm looping over the rows, and adding the data in a hashmap,
and then using KafkaProducer to push data into Kafka topic.

This does not seem very efficient, since i'm looping over each row,
and using extra space as well.
What is the best way to design/code this ?

Current Code :

def writeCountToKafka(df):
       if df.count()>0:
          hm = {}
          df_pandas = df.toPandas()
          for _, row in df_pandas.iterrows():
               hm["window"] =
[datetime.timestamp(row["window"]["start"]),datetime.timestamp(row["window"]["end"])]
               hm["processedAlarmCnt"] = row["processedAlarmCnt"]
               hm["totalAlarmCnt"] = row["totalAlarmCnt"]

               # Python Kafka Producer
               kafka_producer.send(topic_count,
json.dumps(mymap).encode('utf-8'))
               kafka_producer.flush()


More details are in stackoverflow :

https://stackoverflow.com/questions/71166560/structured-streaming-writing-dataframe-into-kafka-row-by-row-dataframe-has-a

tia !

Reply via email to