Hello All,
I have data in Kafka topic(data published every 10 mins) and I'm planning
to read this data using Apache Spark Structured Stream(batch mode) and push
it in MongoDB.
Pls note : This will be scheduled using Composer/Airflow on GCP - which
will create a Dataproc cluster, run the spark code, and then delete the
cluster
Here is my current code :
```
# read from Kafka, extract json - and write to mongoDB
df_reader = spark.readStream.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("failOnDataLoss", "false") \
.option("startingOffsets", "earliest") \
.load()
df = df_reader.selectExpr("CAST(value AS STRING)")
df_data =
df.select(from_json(col('value'),schema).alias('data')).select("data.*").filter(col('customer')==database)
# write to Mongo
df_data.write\
.format("mongo") \
.option("uri", mongoConnUri) \
.option("database", database) \
.option("collection", collection) \
.mode("append") \
.save()
```
Since this is run as a batch query every 10 minutes, how do I ensure that
duplicate records are not read, and pushed into MongoDB ? When I use
readStream (every time the job is launched using Airflow) - does it read
all the data in Kafka topic OR from the point it last read the data ?
Pls note : mongo datasource does not support streaming query, else i could
have used the checkpoint to enable this ?
Pls advise what is the best way to achieve this ?
tia!
Here is the stackoverflow link :
https://stackoverflow.com/questions/72723137/structuredstreaming-read-from-kafka-writing-data-into-mongo-every-10-minutes