Is this all of your writeStream?

df.writeStream()
    .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
    .start()
    .awaitTermination();

What happened to the checkpoint location?

option('checkpointLocation', checkpoint_path).

example

 checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"


ls -l  /ssd/hduser/MDBatchBQ/chkpt
total 24
-rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits

so you can see what is going on

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <abhisheksingla...@gmail.com>
wrote:

> Hi Team,
>
> I am using Spark Streaming to read from Kafka and write to S3.
>
> Version: 3.1.2
> Scala Version: 2.12
> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>
> Dataset<Row> df =
>     spark
>         .readStream()
>         .format("kafka")
>         .options(appConfig.getKafka().getConf())
>         .load()
>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
> df.writeStream()
>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
>     .start()
>     .awaitTermination();
>
> kafka.conf = {
>                "kafka.bootstrap.servers": "localhost:9092",
>                "subscribe": "test-topic",
>                "minOffsetsPerTrigger": 10000000,
>                "maxOffsetsPerTrigger": 11000000,
>                "maxTriggerDelay": "15m",
>                "groupIdPrefix": "test",
>                "startingOffsets": "latest",
>                "includeHeaders": true,
>                "failOnDataLoss": false
>               }
>
> spark.conf = {
>                "spark.master": "spark://localhost:7077",
>                "spark.app.name": "app",
>                "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
>                "spark.sql.streaming.metricsEnabled": true
>              }
>
>
> But these configs do not seem to be working as I can see Spark processing
> batches of 3k-15k immediately one after another. Is there something I am
> missing?
>
> Ref:
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> Regards,
> Abhishek Singla
>
>
>
>
>
>
>
>
>

Reply via email to