Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-02-13 Thread Abhishek Singla
Hi Team,

Could someone provide some insights into this issue?

Regards,
Abhishek Singla

On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
abhisheksingla...@gmail.com> wrote:

> Hi Team,
>
> Version: 3.2.2
> Java Version: 1.8.0_211
> Scala Version: 2.12.15
> Cluster: Standalone
>
> I am using Spark Streaming to read from Kafka and write to S3. The job
> fails with below error if there are no records published to Kafka for a few
> days and then there are some records published. Could someone help me in
> identifying the root cause of this job failure.
>
> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
> 0919e548-9706-4757-be94-359848100070] terminated with error
> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
> valid local directory for s3ablock-0001-
>   at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>   at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>   at 
> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>   at 
> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>   at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>   at 
> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>   at 
> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>   at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>   at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>   at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140)
>   at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143)
>   at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at scala.Option.getOrElse(Option.scala:189)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>   at 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>   at 
> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>   at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>   at 
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>   at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>   at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeT

Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-

2024-01-17 Thread Abhishek Singla
)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)


Dataset df =
spark
.readStream()
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.option("checkpointLocation", appConfig.getChk().getPath())
.start()
.awaitTermination();


Regards,
Abhishek Singla


Re: config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Thanks, Mich for acknowledging.

Yes, I am providing the checkpoint path. I omitted it here in the code
snippet.

I believe this is due to spark version 3.1.x, this config is there only in
versions greater than 3.2.x

On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh 
wrote:

> Is this all of your writeStream?
>
> df.writeStream()
> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
> .start()
> .awaitTermination();
>
> What happened to the checkpoint location?
>
> option('checkpointLocation', checkpoint_path).
>
> example
>
>  checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt"
>
>
> ls -l  /ssd/hduser/MDBatchBQ/chkpt
> total 24
> -rw-r--r--. 1 hduser hadoop   45 Mar  1 09:27 metadata
> drwxr-xr-x. 5 hduser hadoop 4096 Mar  1 09:27 .
> drwxr-xr-x. 4 hduser hadoop 4096 Mar  1 10:31 ..
> drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets
> drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits
>
> so you can see what is going on
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 27 Apr 2023 at 15:46, Abhishek Singla 
> wrote:
>
>> Hi Team,
>>
>> I am using Spark Streaming to read from Kafka and write to S3.
>>
>> Version: 3.1.2
>> Scala Version: 2.12
>> Spark Kafka connector: spark-sql-kafka-0-10_2.12
>>
>> Dataset df =
>> spark
>> .readStream()
>> .format("kafka")
>> .options(appConfig.getKafka().getConf())
>> .load()
>> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>> .start()
>> .awaitTermination();
>>
>> kafka.conf = {
>>"kafka.bootstrap.servers": "localhost:9092",
>>"subscribe": "test-topic",
>>"minOffsetsPerTrigger": 1000,
>>"maxOffsetsPerTrigger": 1100,
>>"maxTriggerDelay": "15m",
>>"groupIdPrefix": "test",
>>"startingOffsets": "latest",
>>"includeHeaders": true,
>>"failOnDataLoss": false
>>   }
>>
>> spark.conf = {
>>"spark.master": "spark://localhost:7077",
>>"spark.app.name": "app",
>>"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": 
>> false,
>>"spark.sql.streaming.metricsEnabled": true
>>  }
>>
>>
>> But these configs do not seem to be working as I can see Spark processing
>> batches of 3k-15k immediately one after another. Is there something I am
>> missing?
>>
>> Ref:
>> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>
>>
>>
>>


config: minOffsetsPerTrigger not working

2023-04-27 Thread Abhishek Singla
Hi Team,

I am using Spark Streaming to read from Kafka and write to S3.

Version: 3.1.2
Scala Version: 2.12
Spark Kafka connector: spark-sql-kafka-0-10_2.12

Dataset df =
spark
.readStream()
.format("kafka")
.options(appConfig.getKafka().getConf())
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

df.writeStream()
.foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig))
.start()
.awaitTermination();

kafka.conf = {
   "kafka.bootstrap.servers": "localhost:9092",
   "subscribe": "test-topic",
   "minOffsetsPerTrigger": 1000,
   "maxOffsetsPerTrigger": 1100,
   "maxTriggerDelay": "15m",
   "groupIdPrefix": "test",
   "startingOffsets": "latest",
   "includeHeaders": true,
   "failOnDataLoss": false
  }

spark.conf = {
   "spark.master": "spark://localhost:7077",
   "spark.app.name": "app",
   "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false,
   "spark.sql.streaming.metricsEnabled": true
 }


But these configs do not seem to be working as I can see Spark processing
batches of 3k-15k immediately one after another. Is there something I am
missing?

Ref:
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Regards,
Abhishek Singla