Re: Structured Streaming Checkpoint Error
Thanks Jungtaek! It makes sense, we are currently changing to an HDFS-Compatible FS, I was wondering how this change would impact the checkpoint, but after what you said it is more clear now. On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim wrote: > In theory it would work, but works very inefficiently on checkpointing. If > I understand correctly, it will write the content to the temp file on s3, > and rename the file which actually gets the temp file from s3 and write the > content of temp file to the final path on s3. Compared to checkpoint with > HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom > implementation of checkpoint manager on S3. > > Also atomic rename is still not working for S3, as well as S3 doesn't > support write with overwrite=false. That said, there's no barrier if > concurrent streaming queries access to the same checkpoint and mess up. > With checkpoint in HDFS, the rename is atomic and only one succeeds even in > parallel and the other query lost writing to the checkpoint file simply > fails. That's a caveat you may want to keep in mind. > > On Wed, Dec 2, 2020 at 11:35 PM German Schiavon > wrote: > >> Hello! >> >> @Gabor Somogyi I wonder that now that s3 is >> *strongly >> consistent* , would work fine. >> >> >> Regards! >> >> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ >> >> On Thu, 17 Sep 2020 at 11:55, German Schiavon >> wrote: >> >>> Hi Gabor, >>> >>> Makes sense, thanks a lot! >>> >>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi >>> wrote: >>> Hi, Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency. Please choose an HDFS compliant filesystem and it will work like a charm. BR, G On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < gschiavonsp...@gmail.com> wrote: > Hi! > > I have an Structured Streaming Application that reads from kafka, > performs some aggregations and writes in S3 in parquet format. > > Everything seems to work great except that from time to time I get a > checkpoint error, at the beginning I thought it was a random error but it > happened more than 3 times already in a few days > > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > > Does this happen to anyone else? > > Thanks in advance. > > *This is the full error :* > > ERROR streaming.MicroBatchExecution: Query segmentValidation [id = > 14edaddf-25bb-4259-b7a2-6107907f962f, runId = > 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error > > java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) > > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) > > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) > > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) > > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) > > at > scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at scala.Option.getOrElse(Option.scala:189) >
Re: Structured Streaming Checkpoint Error
In theory it would work, but works very inefficiently on checkpointing. If I understand correctly, it will write the content to the temp file on s3, and rename the file which actually gets the temp file from s3 and write the content of temp file to the final path on s3. Compared to checkpoint with HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom implementation of checkpoint manager on S3. Also atomic rename is still not working for S3, as well as S3 doesn't support write with overwrite=false. That said, there's no barrier if concurrent streaming queries access to the same checkpoint and mess up. With checkpoint in HDFS, the rename is atomic and only one succeeds even in parallel and the other query lost writing to the checkpoint file simply fails. That's a caveat you may want to keep in mind. On Wed, Dec 2, 2020 at 11:35 PM German Schiavon wrote: > Hello! > > @Gabor Somogyi I wonder that now that s3 is > *strongly > consistent* , would work fine. > > > Regards! > > https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ > > On Thu, 17 Sep 2020 at 11:55, German Schiavon > wrote: > >> Hi Gabor, >> >> Makes sense, thanks a lot! >> >> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi >> wrote: >> >>> Hi, >>> >>> Structured Streaming is simply not working when checkpoint location is >>> on S3 due to it's read-after-write consistency. >>> Please choose an HDFS compliant filesystem and it will work like a charm. >>> >>> BR, >>> G >>> >>> >>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < >>> gschiavonsp...@gmail.com> wrote: >>> Hi! I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format. Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp Does this happen to anyone else? Thanks in advance. *This is the full error :* ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at scala.Option.getOrElse(Option.scala:189) >>>
Re: Structured Streaming Checkpoint Error
Hello! @Gabor Somogyi I wonder that now that s3 is *strongly consistent* , would work fine. Regards! https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ On Thu, 17 Sep 2020 at 11:55, German Schiavon wrote: > Hi Gabor, > > Makes sense, thanks a lot! > > On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi > wrote: > >> Hi, >> >> Structured Streaming is simply not working when checkpoint location is on >> S3 due to it's read-after-write consistency. >> Please choose an HDFS compliant filesystem and it will work like a charm. >> >> BR, >> G >> >> >> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon >> wrote: >> >>> Hi! >>> >>> I have an Structured Streaming Application that reads from kafka, >>> performs some aggregations and writes in S3 in parquet format. >>> >>> Everything seems to work great except that from time to time I get a >>> checkpoint error, at the beginning I thought it was a random error but it >>> happened more than 3 times already in a few days >>> >>> Caused by: java.io.FileNotFoundException: No such file or directory: >>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>> >>> >>> Does this happen to anyone else? >>> >>> Thanks in advance. >>> >>> *This is the full error :* >>> >>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>> >>> java.io.FileNotFoundException: No such file or directory: >>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>> >>> at >>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>> >>> at >>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>> >>> at >>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>> >>> at >>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>> >>> at >>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>> >>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>> >>> at >>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>> >>> at >>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>> >>> at >>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>> >>> at >>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>> >>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>> at scala.Option.getOrElse(Option.scala:189) >>> >>
Re: Structured Streaming Checkpoint Error
Hi Gabor, Makes sense, thanks a lot! On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi wrote: > Hi, > > Structured Streaming is simply not working when checkpoint location is on > S3 due to it's read-after-write consistency. > Please choose an HDFS compliant filesystem and it will work like a charm. > > BR, > G > > > On Wed, Sep 16, 2020 at 4:12 PM German Schiavon > wrote: > >> Hi! >> >> I have an Structured Streaming Application that reads from kafka, >> performs some aggregations and writes in S3 in parquet format. >> >> Everything seems to work great except that from time to time I get a >> checkpoint error, at the beginning I thought it was a random error but it >> happened more than 3 times already in a few days >> >> Caused by: java.io.FileNotFoundException: No such file or directory: >> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >> >> >> Does this happen to anyone else? >> >> Thanks in advance. >> >> *This is the full error :* >> >> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >> >> java.io.FileNotFoundException: No such file or directory: >> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >> >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >> >> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >> >> at >> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >> >> at >> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >> >> at >> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >> >> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >> >> at >> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >> >> at >> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >> >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >> >> at >> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >> >> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >> at scala.Option.getOrElse(Option.scala:189) >> >
Re: Structured Streaming Checkpoint Error
Hi, Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency. Please choose an HDFS compliant filesystem and it will work like a charm. BR, G On Wed, Sep 16, 2020 at 4:12 PM German Schiavon wrote: > Hi! > > I have an Structured Streaming Application that reads from kafka, performs > some aggregations and writes in S3 in parquet format. > > Everything seems to work great except that from time to time I get a > checkpoint error, at the beginning I thought it was a random error but it > happened more than 3 times already in a few days > > Caused by: java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > > Does this happen to anyone else? > > Thanks in advance. > > *This is the full error :* > > ERROR streaming.MicroBatchExecution: Query segmentValidation [id = > 14edaddf-25bb-4259-b7a2-6107907f962f, runId = > 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error > > java.io.FileNotFoundException: No such file or directory: > s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) > > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) > > at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) > > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) > > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) > > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) > > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) > > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) > at scala.Option.getOrElse(Option.scala:189) >
Structured Streaming Checkpoint Error
Hi! I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format. Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp Does this happen to anyone else? Thanks in advance. *This is the full error :* ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at scala.Option.getOrElse(Option.scala:189)