Hello All, I have data in Kafka topic(data published every 10 mins) and I'm planning to read this data using Apache Spark Structured Stream(batch mode) and push it in MongoDB.
Pls note : This will be scheduled using Composer/Airflow on GCP - which will create a Dataproc cluster, run the spark code, and then delete the cluster Here is my current code : ``` # read from Kafka, extract json - and write to mongoDB df_reader = spark.readStream.format('kafka')\ .option("kafka.bootstrap.servers",kafkaBrokers)\ .option("kafka.security.protocol","SSL") \ .option("kafka.ssl.truststore.location",ssl_truststore_location) \ .option("kafka.ssl.truststore.password",ssl_truststore_password) \ .option("kafka.ssl.keystore.location", ssl_keystore_location)\ .option("kafka.ssl.keystore.password", ssl_keystore_password)\ .option("subscribe", topic) \ .option("kafka.group.id", consumerGroupId)\ .option("failOnDataLoss", "false") \ .option("startingOffsets", "earliest") \ .load() df = df_reader.selectExpr("CAST(value AS STRING)") df_data = df.select(from_json(col('value'),schema).alias('data')).select("data.*").filter(col('customer')==database) # write to Mongo df_data.write\ .format("mongo") \ .option("uri", mongoConnUri) \ .option("database", database) \ .option("collection", collection) \ .mode("append") \ .save() ``` Since this is run as a batch query every 10 minutes, how do I ensure that duplicate records are not read, and pushed into MongoDB ? When I use readStream (every time the job is launched using Airflow) - does it read all the data in Kafka topic OR from the point it last read the data ? Pls note : mongo datasource does not support streaming query, else i could have used the checkpoint to enable this ? Pls advise what is the best way to achieve this ? tia! Here is the stackoverflow link : https://stackoverflow.com/questions/72723137/structuredstreaming-read-from-kafka-writing-data-into-mongo-every-10-minutes