Thanks, Mich for acknowledging. Yes, I am providing the checkpoint path. I omitted it here in the code snippet.
I believe this is due to spark version 3.1.x, this config is there only in versions greater than 3.2.x On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Is this all of your writeStream? > > df.writeStream() > .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) > .start() > .awaitTermination(); > > What happened to the checkpoint location? > > option('checkpointLocation', checkpoint_path). > > example > > checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt" > > > ls -l /ssd/hduser/MDBatchBQ/chkpt > total 24 > -rw-r--r--. 1 hduser hadoop 45 Mar 1 09:27 metadata > drwxr-xr-x. 5 hduser hadoop 4096 Mar 1 09:27 . > drwxr-xr-x. 4 hduser hadoop 4096 Mar 1 10:31 .. > drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources > drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets > drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits > > so you can see what is going on > > HTH > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <abhisheksingla...@gmail.com> > wrote: > >> Hi Team, >> >> I am using Spark Streaming to read from Kafka and write to S3. >> >> Version: 3.1.2 >> Scala Version: 2.12 >> Spark Kafka connector: spark-sql-kafka-0-10_2.12 >> >> Dataset<Row> df = >> spark >> .readStream() >> .format("kafka") >> .options(appConfig.getKafka().getConf()) >> .load() >> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); >> >> df.writeStream() >> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, >> appConfig)) >> .start() >> .awaitTermination(); >> >> kafka.conf = { >> "kafka.bootstrap.servers": "localhost:9092", >> "subscribe": "test-topic", >> "minOffsetsPerTrigger": 10000000, >> "maxOffsetsPerTrigger": 11000000, >> "maxTriggerDelay": "15m", >> "groupIdPrefix": "test", >> "startingOffsets": "latest", >> "includeHeaders": true, >> "failOnDataLoss": false >> } >> >> spark.conf = { >> "spark.master": "spark://localhost:7077", >> "spark.app.name": "app", >> "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": >> false, >> "spark.sql.streaming.metricsEnabled": true >> } >> >> >> But these configs do not seem to be working as I can see Spark processing >> batches of 3k-15k immediately one after another. Is there something I am >> missing? >> >> Ref: >> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html >> >> Regards, >> Abhishek Singla >> >> >> >> >> >> >> >> >>