DiskChecker$DiskErrorException: Could not find any valid local directory
for s3ablock-0001-

out of space?

tir. 13. feb. 2024 kl. 21:24 skrev Abhishek Singla <
abhisheksingla...@gmail.com>:

> Hi Team,
>
> Could someone provide some insights into this issue?
>
> Regards,
> Abhishek Singla
>
> On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla <
> abhisheksingla...@gmail.com> wrote:
>
>> Hi Team,
>>
>> Version: 3.2.2
>> Java Version: 1.8.0_211
>> Scala Version: 2.12.15
>> Cluster: Standalone
>>
>> I am using Spark Streaming to read from Kafka and write to S3. The job
>> fails with below error if there are no records published to Kafka for a few
>> days and then there are some records published. Could someone help me in
>> identifying the root cause of this job failure.
>>
>> 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = 
>> 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = 
>> 0919e548-9706-4757-be94-359848100070] terminated with error
>> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any 
>> valid local directory for s3ablock-0001-
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462)
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>      at 
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204)
>>      at 
>> org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:182)
>>      at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369)
>>      at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305)
>>      at 
>> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102)
>>      at 
>> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>>      at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>>      at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>>      at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>>      at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>>      at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>>      at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
>>      at 
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
>>      at 
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173)
>>      at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>      at scala.Option.getOrElse(Option.scala:189)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171)
>>      at 
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116)
>>      at 
>> org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440)
>>      at 
>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>>      at 
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>>      at 
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>>      at 
>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>>      at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>>      at 
>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
>>
>>
>> Dataset<Row> df =
>>     spark
>>         .readStream()
>>         .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
>>         .options(appConfig.getKafka().getConf())
>>         .load()
>>         .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>>
>> df.writeStream()
>>     .foreachBatch(new KafkaS3PipelineImplementation(applicationId, 
>> appConfig))
>>     .option("checkpointLocation", appConfig.getChk().getPath())
>>     .start()
>>     .awaitTermination();
>>
>>
>> Regards,
>> Abhishek Singla
>>
>>
>>
>>
>>
>>

-- 
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge

+47 480 94 297

Reply via email to