[ https://issues.apache.org/jira/browse/SPARK-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968221#comment-16968221 ]
Nick Hryhoriev edited comment on SPARK-19407 at 11/6/19 9:40 AM: ----------------------------------------------------------------- I have the same issue with spark 2.4.4. When I Use spark on YARN in Client mode. {code:java} val sparkConf = SparkOnYarnAppController.sparkHadoopKeys(configuration) .foldLeft(new SparkConf() .setAppName(appName) .setIfMissing("spark.ui.enabled", "false") .setMaster("yarn") .setIfMissing("spark.hadoop.yarn.resourcemanager.hostname",s"hadoop-$cluster.com") .setIfMissing("spark.yarn.archive", s"hdfs:///sparkDistributions/$distribution.tgz") .setIfMissing("spark.dynamicAllocation.enabled", "false") .setIfMissing("spark.driver.memory", "1g") .setIfMissing("spark.driver.cores", "1") .setIfMissing("spark.executor.memory", "1g") .setIfMissing("spark.executor.instances", "1") .setIfMissing("spark.executor.cores", "1") .setIfMissing("spark.yarn.maxAppAttempts", "1") )((sparkConf, hadoopProps) => sparkConf.set(hadoopProps._1, hadoopProps._2)) SparkSession.builder().config(sparkConf).getOrCreate() case class TestRecord(partition: Long, value: String) object CommonTools { implicit class MockDataframe(session: SparkSession) { implicit val sqlContext: SQLContext = session.sqlContext def mockDataFrame[T: Encoder](mockData: Seq[T]): DataFrame ={ val mockStream = MemoryStream[T] mockStream.addData(mockData) mockStream.toDF() } } implicit class StreamSinkToHadoopFileSystem(dataFrame: DataFrame) { def sinkToS3(s3path: Str ing, format: String, checkpointDir: String, trigger: Trigger): StreamingQuery ={ dataFrame.writeStream .format("parquet") .queryName("Test-TooManyVersionPerRootPrefixInS3") .trigger(trigger) .option("checkpointLocation", checkpointDir) .format(format) .partitionBy("partition") .option("path", s3path) .start() } } } val stream = sparkSession .mockDataFrame[TestRecord]((0 to 100).map { i => TestRecord(i, s"i-${UUID.randomUUID().toString}") }) .sinkToS3( s3path = s"$outputDir/TooManyVersionPerRootPrefixInS3/", format = "parquet", checkpointDir = s"$checkpointDir/TooManyVersionPerRootPrefixInS3-checkpoint", trigger = Trigger.ProcessingTime(5.seconds) ){code} exception {code:java} 2019-11-06 11:31:47 ERROR org.apache.spark.sql.execution.streaming.StreamMetadata:91 - Error writing stream metadata StreamMetadata(bc32a9a9-8328-406d-8b37-30770e10962b) to s3a://bukcet/mhr/TooManyVersionPerRootPrefixInS3-checkpoint/metadata2019-11-06 11:31:47 ERROR org.apache.spark.sql.execution.streaming.StreamMetadata:91 - Error writing stream metadata StreamMetadata(bc32a9a9-8328-406d-8b37-30770e10962b) to s3a://af-eu-west-1-stg-data-lake-orc-with-crr/mhr/TooManyVersionPerRootPrefixInS3-checkpoint/metadataorg.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories. at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461) at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200) at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:475) at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:663) at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1177) at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100) at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:605) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:703) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:699) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.create(FileContext.java:699) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)Exception in thread "main" at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318) at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:125) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:123) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)org.apache.hadoop.util.DiskChecker$DiskErrorException: No space available in any of the local directories. at com.appsflyer.spark.s3.it.CommonTools$StreamSinkToHadoopFileSystem.sinkToS3(TooManyVersionPerRootPrefixInS3.scala:45) at com.appsflyer.spark.s3.it.TooManyVersionPerRootPrefixInS3$.main(TooManyVersionPerRootPrefixInS3.scala:80) at com.appsflyer.spark.s3.it.TooManyVersionPerRootPrefixInS3.main(TooManyVersionPerRootPrefixInS3.scala) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:400) at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:461) at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200) at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:475) at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:66) at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:663) at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1177) at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:100) at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:605) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:703) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:699) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.create(FileContext.java:699) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318) at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:125) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:123) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325) {code} was (Author: hryhoriev.nick): I have the same issue with spark 2.4.4. When I Use spark on YARN in Client mode. {code:java} val sparkConf = SparkOnYarnAppController.sparkHadoopKeys(configuration) .foldLeft(new SparkConf() .setAppName(appName) .setIfMissing("spark.ui.enabled", "false") .setMaster("yarn") .setIfMissing("spark.hadoop.yarn.resourcemanager.hostname",s"hadoop-$cluster.com") .setIfMissing("spark.yarn.archive", s"hdfs:///sparkDistributions/$distribution.tgz") .setIfMissing("spark.dynamicAllocation.enabled", "false") .setIfMissing("spark.driver.memory", "1g") .setIfMissing("spark.driver.cores", "1") .setIfMissing("spark.executor.memory", "1g") .setIfMissing("spark.executor.instances", "1") .setIfMissing("spark.executor.cores", "1") .setIfMissing("spark.yarn.maxAppAttempts", "1") )((sparkConf, hadoopProps) => sparkConf.set(hadoopProps._1, hadoopProps._2)) SparkSession.builder().config(sparkConf).getOrCreate() case class TestRecord(partition: Long, value: String) object CommonTools { implicit class MockDataframe(session: SparkSession) { implicit val sqlContext: SQLContext = session.sqlContext def mockDataFrame[T: Encoder](mockData: Seq[T]): DataFrame ={ val mockStream = MemoryStream[T] mockStream.addData(mockData) mockStream.toDF() } } implicit class StreamSinkToHadoopFileSystem(dataFrame: DataFrame) { def sinkToS3(s3path: Str ing, format: String, checkpointDir: String, trigger: Trigger): StreamingQuery ={ dataFrame.writeStream .format("parquet") .queryName("Test-TooManyVersionPerRootPrefixInS3") .trigger(trigger) .option("checkpointLocation", checkpointDir) .format(format) .partitionBy("partition") .option("path", s3path) .start() } } } val stream = sparkSession .mockDataFrame[TestRecord]((0 to 100).map { i => TestRecord(i, s"i-${UUID.randomUUID().toString}") }) .sinkToS3( s3path = s"$outputDir/TooManyVersionPerRootPrefixInS3/", format = "parquet", checkpointDir = s"$checkpointDir/TooManyVersionPerRootPrefixInS3-checkpoint", trigger = Trigger.ProcessingTime(5.seconds) ){code} > defaultFS is used FileSystem.get instead of getting it from uri scheme > ---------------------------------------------------------------------- > > Key: SPARK-19407 > URL: https://issues.apache.org/jira/browse/SPARK-19407 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Reporter: Amit Assudani > Assignee: Genmao Yu > Priority: Major > Labels: checkpoint, filesystem, starter, streaming > Fix For: 2.1.1, 2.2.0 > > > Caused by: java.lang.IllegalArgumentException: Wrong FS: > s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, > expected: file:/// > at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649) > at > org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) > at > org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) > at > org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) > Can easily replicate on spark standalone cluster by providing checkpoint > location uri scheme anything other than "file://" and not overriding in > config. > WorkAround --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in > sparkConf or spark-default.conf -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org