Hello All,

I have data in Kafka topic(data published every 10 mins) and I'm planning
to read this data using Apache Spark Structured Stream(batch mode) and push
it in MongoDB.

Pls note : This will be scheduled using Composer/Airflow on GCP - which
will create a Dataproc cluster, run the spark code, and then delete the
cluster

Here is my current code :

```

# read from Kafka, extract json - and write to mongoDB

df_reader = spark.readStream.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "earliest") \
    .load()

df = df_reader.selectExpr("CAST(value AS STRING)")
df_data = 
df.select(from_json(col('value'),schema).alias('data')).select("data.*").filter(col('customer')==database)

# write to Mongo
df_data.write\
    .format("mongo") \
    .option("uri", mongoConnUri) \
    .option("database", database) \
    .option("collection", collection) \
    .mode("append") \
    .save()


```

Since this is run as a batch query every 10 minutes, how do I ensure that
duplicate records are not read, and pushed into MongoDB ? When I use
readStream (every time the job is launched using Airflow) - does it read
all the data in Kafka topic OR from the point it last read the data ?

Pls note : mongo datasource does not support streaming query, else i could
have used the checkpoint to enable this ?

Pls advise what is the best way to achieve this ?

tia!


Here is the stackoverflow link :

https://stackoverflow.com/questions/72723137/structuredstreaming-read-from-kafka-writing-data-into-mongo-every-10-minutes

Reply via email to