Hi Team,

I am using Spark Streaming to read from Kafka and write to S3.

Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12

Dataset<Row> df =
    spark
        .readStream()
        .format("kafka")
        .options(appConfig.getKafka().getConf())
        .load()
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
    .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
    .start()
    .awaitTermination();

kafka.conf = {
               "kafka.bootstrap.servers": "localhost:9092",
               "subscribe": "test-topic",
               "minOffsetsPerTrigger": 10000000,
               "maxOffsetsPerTrigger": 11000000,
               "maxTriggerDelay": "15m",
               "groupIdPrefix": "test",
               "startingOffsets": "latest",
               "includeHeaders": true,
               "failOnDataLoss": false
              }

spark.conf = {
               "spark.master": "spark://localhost:7077",
               "spark.app.name": "app",
               "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
               "spark.sql.streaming.metricsEnabled": true
             }


But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?

Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Regards,
Abhishek Singla

Reply via email to