Re: Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-
Hi Team, Could someone provide some insights into this issue? Regards, Abhishek Singla On Wed, Jan 17, 2024 at 11:45 PM Abhishek Singla < abhisheksingla...@gmail.com> wrote: > Hi Team, > > Version: 3.2.2 > Java Version: 1.8.0_211 > Scala Version: 2.12.15 > Cluster: Standalone > > I am using Spark Streaming to read from Kafka and write to S3. The job > fails with below error if there are no records published to Kafka for a few > days and then there are some records published. Could someone help me in > identifying the root cause of this job failure. > > 24/01/17 10:49:22 ERROR MicroBatchExecution: Query [id = > 72ee1070-7e05-4999-8b55-2a99e216ec51, runId = > 0919e548-9706-4757-be94-359848100070] terminated with error > org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any > valid local directory for s3ablock-0001- > at > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:462) > at > org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165) > at > org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:1019) > at > org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:816) > at > org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:204) > at > org.apache.hadoop.fs.s3a.S3ABlockOutputStream.(S3ABlockOutputStream.java:182) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1369) > at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1305) > at > org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:102) > at > org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626) > at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701) > at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697) > at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) > at org.apache.hadoop.fs.FileContext.create(FileContext.java:703) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:140) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.(CheckpointFileManager.scala:143) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$addNewBatchByStream$2(HDFSMetadataLog.scala:173) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at scala.Option.getOrElse(Option.scala:189) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:171) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:116) > at > org.apache.spark.sql.execution.streaming.OffsetSeqLog.add(OffsetSeqLog.scala:53) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$14(MicroBatchExecution.scala:442) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:440) > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:627) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:380) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:210) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeT
Facing Error org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-
) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209) Dataset df = spark .readStream() .format("org.apache.spark.sql.kafka010.KafkaSourceProvider") .options(appConfig.getKafka().getConf()) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); df.writeStream() .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) .option("checkpointLocation", appConfig.getChk().getPath()) .start() .awaitTermination(); Regards, Abhishek Singla
Re: config: minOffsetsPerTrigger not working
Thanks, Mich for acknowledging. Yes, I am providing the checkpoint path. I omitted it here in the code snippet. I believe this is due to spark version 3.1.x, this config is there only in versions greater than 3.2.x On Thu, Apr 27, 2023 at 9:26 PM Mich Talebzadeh wrote: > Is this all of your writeStream? > > df.writeStream() > .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) > .start() > .awaitTermination(); > > What happened to the checkpoint location? > > option('checkpointLocation', checkpoint_path). > > example > > checkpoint_path = "file:///ssd/hduser/MDBatchBQ/chkpt" > > > ls -l /ssd/hduser/MDBatchBQ/chkpt > total 24 > -rw-r--r--. 1 hduser hadoop 45 Mar 1 09:27 metadata > drwxr-xr-x. 5 hduser hadoop 4096 Mar 1 09:27 . > drwxr-xr-x. 4 hduser hadoop 4096 Mar 1 10:31 .. > drwxr-xr-x. 3 hduser hadoop 4096 Apr 22 11:27 sources > drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 offsets > drwxr-xr-x. 2 hduser hadoop 4096 Apr 24 11:09 commits > > so you can see what is going on > > HTH > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Thu, 27 Apr 2023 at 15:46, Abhishek Singla > wrote: > >> Hi Team, >> >> I am using Spark Streaming to read from Kafka and write to S3. >> >> Version: 3.1.2 >> Scala Version: 2.12 >> Spark Kafka connector: spark-sql-kafka-0-10_2.12 >> >> Dataset df = >> spark >> .readStream() >> .format("kafka") >> .options(appConfig.getKafka().getConf()) >> .load() >> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); >> >> df.writeStream() >> .foreachBatch(new KafkaS3PipelineImplementation(applicationId, >> appConfig)) >> .start() >> .awaitTermination(); >> >> kafka.conf = { >>"kafka.bootstrap.servers": "localhost:9092", >>"subscribe": "test-topic", >>"minOffsetsPerTrigger": 1000, >>"maxOffsetsPerTrigger": 1100, >>"maxTriggerDelay": "15m", >>"groupIdPrefix": "test", >>"startingOffsets": "latest", >>"includeHeaders": true, >>"failOnDataLoss": false >> } >> >> spark.conf = { >>"spark.master": "spark://localhost:7077", >>"spark.app.name": "app", >>"spark.sql.streaming.kafka.useDeprecatedOffsetFetching": >> false, >>"spark.sql.streaming.metricsEnabled": true >> } >> >> >> But these configs do not seem to be working as I can see Spark processing >> batches of 3k-15k immediately one after another. Is there something I am >> missing? >> >> Ref: >> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html >> >> Regards, >> Abhishek Singla >> >> >> >> >> >> >> >> >>
config: minOffsetsPerTrigger not working
Hi Team, I am using Spark Streaming to read from Kafka and write to S3. Version: 3.1.2 Scala Version: 2.12 Spark Kafka connector: spark-sql-kafka-0-10_2.12 Dataset df = spark .readStream() .format("kafka") .options(appConfig.getKafka().getConf()) .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); df.writeStream() .foreachBatch(new KafkaS3PipelineImplementation(applicationId, appConfig)) .start() .awaitTermination(); kafka.conf = { "kafka.bootstrap.servers": "localhost:9092", "subscribe": "test-topic", "minOffsetsPerTrigger": 1000, "maxOffsetsPerTrigger": 1100, "maxTriggerDelay": "15m", "groupIdPrefix": "test", "startingOffsets": "latest", "includeHeaders": true, "failOnDataLoss": false } spark.conf = { "spark.master": "spark://localhost:7077", "spark.app.name": "app", "spark.sql.streaming.kafka.useDeprecatedOffsetFetching": false, "spark.sql.streaming.metricsEnabled": true } But these configs do not seem to be working as I can see Spark processing batches of 3k-15k immediately one after another. Is there something I am missing? Ref: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html Regards, Abhishek Singla