Hi Team, I am using Spark Streaming to read from Kafka and write to S3.
Version: 3.1.2 Scala Version: 2.12 Spark Kafka connector: spark-sql-kafka-0-10_2.12 Dataset<Row> df = spark .readStream() .format("kafka") .options(appConfig.getKafka().getConf()) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); df.writeStream() .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) .start() .awaitTermination(); kafka.conf = { "kafka.bootstrap.servers": "localhost:9092", "subscribe": "test-topic", "minOffsetsPerTrigger": 10000000, "maxOffsetsPerTrigger": 11000000, "maxTriggerDelay": "15m", "groupIdPrefix": "test", "startingOffsets": "latest", "includeHeaders": true, "failOnDataLoss": false } spark.conf = { "spark.master": "spark://localhost:7077", "spark.app.name": "app", "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false, "spark.sql.streaming.metricsEnabled": true } But these configs do not seem to be working as I can see Spark processing batches of 3k-15k immediately one after another. Is there something I am missing? Ref: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html Regards, Abhishek Singla