Thanks, Mich for acknowledging.

Yes, I am providing the checkpoint path. I omitted it here in the code
snippet.

I believe this is due to spark version 3.1.x, this config is there only in
versions greater than 3.2.x

On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Is this all of your writeStream?
>
> df.writeStream()
>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
>     .start()
>     .awaitTermination();
>
> What happened to the checkpoint location?
>
> option('checkpointLocation', checkpoint_path).
>
> example
>
>  checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
>
>
> ls -l  /ssd/hduser/MDBatchBQ/chkpt
> total 24
> -rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
> drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
> drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
> drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
>
> so you can see what is going on
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Apr 2023 at 15:46, Abhishek Singla <abhisheksingla...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am using Spark Streaming to read from Kafka and write to S3.
>>
>> Version: 3.1.2
>> Scala Version: 2.12
>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>
>> Dataset<Row> df =
>>     spark
>>         .readStream()
>>         .format("kafka")
>>         .options(appConfig.getKafka().getConf())
>>         .load()
>>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>>     .start()
>>     .awaitTermination();
>>
>> kafka.conf = {
>>                "kafka.bootstrap.servers": "localhost:9092",
>>                "subscribe": "test-topic",
>>                "minOffsetsPerTrigger": 10000000,
>>                "maxOffsetsPerTrigger": 11000000,
>>                "maxTriggerDelay": "15m",
>>                "groupIdPrefix": "test",
>>                "startingOffsets": "latest",
>>                "includeHeaders": true,
>>                "failOnDataLoss": false
>>               }
>>
>> spark.conf = {
>>                "spark.master": "spark://localhost:7077",
>>                "spark.app.name": "app",
>>                "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": 
>> false,
>>                "spark.sql.streaming.metricsEnabled": true
>>              }
>>
>>
>> But these configs do not seem to be working as I can see Spark processing
>> batches of 3k-15k immediately one after another. Is there something I am
>> missing?
>>
>> Ref:
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>
>>
>>
>>

Reply via email to