Is this all of your writeStream?
df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();
What happened to the checkpoint location?
option('checkpointLocation', checkpoint_path).
example
checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
ls -l /ssd/hduser/MDBatchBQ/chkpt
total 24
-rw-r--r--. 1 hduser hadoop 45 Mar 1 09:27 metadata
drwxr-xr-x. 5 hduser hadoop 4096 Mar 1 09:27 .
drwxr-xr-x. 4 hduser hadoop 4096 Mar 1 10:31 ..
drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
so you can see what is going on
HTH
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <[email protected]>
wrote:
> Hi Team,
>
> I am using Spark Streaming to read from Kafka and write to S3.
>
> Version: 3.1.2
> Scala Version: 2.12
> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>
> Dataset<Row> df =
> spark
> .readStream()
> .format("kafka")
> .options(appConfig.getKafka().getConf())
> .load()
> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
> df.writeStream()
> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
> .start()
> .awaitTermination();
>
> kafka.conf = {
> "kafka.bootstrap.servers": "localhost:9092",
> "subscribe": "test-topic",
> "minOffsetsPerTrigger": 10000000,
> "maxOffsetsPerTrigger": 11000000,
> "maxTriggerDelay": "15m",
> "groupIdPrefix": "test",
> "startingOffsets": "latest",
> "includeHeaders": true,
> "failOnDataLoss": false
> }
>
> spark.conf = {
> "spark.master": "spark://localhost:7077",
> "spark.app.name": "app",
> "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
> "spark.sql.streaming.metricsEnabled": true
> }
>
>
> But these configs do not seem to be working as I can see Spark processing
> batches of 3k-15k immediately one after another. Is there something I am
> missing?
>
> Ref:
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> Regards,
> Abhishek Singla
>
>
>
>
>
>
>
>
>