Thanks Jungtaek! It makes sense, we are currently changing to an HDFS-Compatible FS, I was wondering how this change would impact the checkpoint, but after what you said it is more clear now.
On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > In theory it would work, but works very inefficiently on checkpointing. If > I understand correctly, it will write the content to the temp file on s3, > and rename the file which actually gets the temp file from s3 and write the > content of temp file to the final path on s3. Compared to checkpoint with > HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom > implementation of checkpoint manager on S3. > > Also atomic rename is still not working for S3, as well as S3 doesn't > support write with overwrite=false. That said, there's no barrier if > concurrent streaming queries access to the same checkpoint and mess up. > With checkpoint in HDFS, the rename is atomic and only one succeeds even in > parallel and the other query lost writing to the checkpoint file simply > fails. That's a caveat you may want to keep in mind. > > On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <gschiavonsp...@gmail.com> > wrote: > >> Hello! >> >> @Gabor Somogyi <gabor.g.somo...@gmail.com> I wonder that now that s3 is >> *strongly >> consistent* , would work fine. >> >> >> Regards! >> >> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/ >> >> On Thu, 17 Sep 2020 at 11:55, German Schiavon <gschiavonsp...@gmail.com> >> wrote: >> >>> Hi Gabor, >>> >>> Makes sense, thanks a lot! >>> >>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Structured Streaming is simply not working when checkpoint location is >>>> on S3 due to it's read-after-write consistency. >>>> Please choose an HDFS compliant filesystem and it will work like a >>>> charm. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon < >>>> gschiavonsp...@gmail.com> wrote: >>>> >>>>> Hi! >>>>> >>>>> I have an Structured Streaming Application that reads from kafka, >>>>> performs some aggregations and writes in S3 in parquet format. >>>>> >>>>> Everything seems to work great except that from time to time I get a >>>>> checkpoint error, at the beginning I thought it was a random error but it >>>>> happened more than 3 times already in a few days >>>>> >>>>> Caused by: java.io.FileNotFoundException: No such file or directory: >>>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>>> >>>>> >>>>> Does this happen to anyone else? >>>>> >>>>> Thanks in advance. >>>>> >>>>> *This is the full error :* >>>>> >>>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id = >>>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId = >>>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error >>>>> >>>>> java.io.FileNotFoundException: No such file or directory: >>>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) >>>>> >>>>> at >>>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) >>>>> >>>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134) >>>>> >>>>> at >>>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120) >>>>> >>>>> at >>>>> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) >>>>> at scala.Option.getOrElse(Option.scala:189) >>>>> >>>>