Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location
of actual data). I have to take that metadata from kafka and apply some
processing.
Processing includes : Reading the actual data location from metadata and
fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", KAFKA_URL) \
            .option("subscribe", KAFKA_TOPICS) \
            .option("startingOffsets",
START_OFFSET).load() .selectExpr("CAST(value AS STRING)")


query = df.writeStream.foreach(process_events).option("checkpointLocation",
"/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in
spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple
times for each single message. If I change it for foreachbatch, will it
optimize it?


Kind Regards,
Sachit Murarka

Reply via email to