Is this all of your writeStream? df.writeStream() .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) .start() .awaitTermination();
What happened to the checkpoint location? option('checkpointLocation', checkpoint_path). example checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt" ls -l /ssd/hduser/MDBatchBQ/chkpt total 24 -rw-r--r--. 1 hduser hadoop 45 Mar 1 09:27 metadata drwxr-xr-x. 5 hduser hadoop 4096 Mar 1 09:27 . drwxr-xr-x. 4 hduser hadoop 4096 Mar 1 10:31 .. drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits so you can see what is going on HTH Mich Talebzadeh, Lead Solutions Architect/Engineering Lead Palantir Technologies Limited London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <abhisheksingla...@gmail.com> wrote: > Hi Team, > > I am using Spark Streaming to read from Kafka and write to S3. > > Version: 3.1.2 > Scala Version: 2.12 > Spark Kafka connector: spark-sql-kafka-0-10_2.12 > > Dataset<Row> df = > spark > .readStream() > .format("kafka") > .options(appConfig.getKafka().getConf()) > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); > > df.writeStream() > .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) > .start() > .awaitTermination(); > > kafka.conf = { > "kafka.bootstrap.servers": "localhost:9092", > "subscribe": "test-topic", > "minOffsetsPerTrigger": 10000000, > "maxOffsetsPerTrigger": 11000000, > "maxTriggerDelay": "15m", > "groupIdPrefix": "test", > "startingOffsets": "latest", > "includeHeaders": true, > "failOnDataLoss": false > } > > spark.conf = { > "spark.master": "spark://localhost:7077", > "spark.app.name": "app", > "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false, > "spark.sql.streaming.metricsEnabled": true > } > > > But these configs do not seem to be working as I can see Spark processing > batches of 3k-15k immediately one after another. Is there something I am > missing? > > Ref: > https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html > > Regards, > Abhishek Singla > > > > > > > > >