Re: Structured Streaming Checkpoint Error

2020-12-03 Thread German Schiavon
Thanks Jungtaek!

It makes sense, we are currently changing to an HDFS-Compatible FS, I was
wondering how this change would impact the checkpoint, but after what you
said it is more clear now.



On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim 
wrote:

> In theory it would work, but works very inefficiently on checkpointing. If
> I understand correctly, it will write the content to the temp file on s3,
> and rename the file which actually gets the temp file from s3 and write the
> content of temp file to the final path on s3. Compared to checkpoint with
> HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
> implementation of checkpoint manager on S3.
>
> Also atomic rename is still not working for S3, as well as S3 doesn't
> support write with overwrite=false. That said, there's no barrier if
> concurrent streaming queries access to the same checkpoint and mess up.
> With checkpoint in HDFS, the rename is atomic and only one succeeds even in
> parallel and the other query lost writing to the checkpoint file simply
> fails. That's a caveat you may want to keep in mind.
>
> On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
> wrote:
>
>> Hello!
>>
>> @Gabor Somogyi   I wonder that now that s3 is 
>> *strongly
>> consistent* , would work fine.
>>
>>
>> Regards!
>>
>> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>>
>> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> Makes sense, thanks a lot!
>>>
>>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>>> wrote:
>>>
 Hi,

 Structured Streaming is simply not working when checkpoint location is
 on S3 due to it's read-after-write consistency.
 Please choose an HDFS compliant filesystem and it will work like a
 charm.

 BR,
 G


 On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
 gschiavonsp...@gmail.com> wrote:

> Hi!
>
> I have an Structured Streaming Application that reads from kafka,
> performs some aggregations and writes in S3 in parquet format.
>
> Everything seems to work great except that from time to time I get a
> checkpoint error, at the beginning I thought it was a random error but it
> happened more than 3 times already in a few days
>
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
>
> Does this happen to anyone else?
>
> Thanks in advance.
>
> *This is the full error :*
>
> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>
> java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>
> at
> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>
> at
> scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
> at scala.Option.getOrElse(Option.scala:189)
>



Re: Structured Streaming Checkpoint Error

2020-12-02 Thread Jungtaek Lim
In theory it would work, but works very inefficiently on checkpointing. If
I understand correctly, it will write the content to the temp file on s3,
and rename the file which actually gets the temp file from s3 and write the
content of temp file to the final path on s3. Compared to checkpoint with
HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't
support write with overwrite=false. That said, there's no barrier if
concurrent streaming queries access to the same checkpoint and mess up.
With checkpoint in HDFS, the rename is atomic and only one succeeds even in
parallel and the other query lost writing to the checkpoint file simply
fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
wrote:

> Hello!
>
> @Gabor Somogyi   I wonder that now that s3 is 
> *strongly
> consistent* , would work fine.
>
>
> Regards!
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>
> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
> wrote:
>
>> Hi Gabor,
>>
>> Makes sense, thanks a lot!
>>
>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Structured Streaming is simply not working when checkpoint location is
>>> on S3 due to it's read-after-write consistency.
>>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
 Hi!

 I have an Structured Streaming Application that reads from kafka,
 performs some aggregations and writes in S3 in parquet format.

 Everything seems to work great except that from time to time I get a
 checkpoint error, at the beginning I thought it was a random error but it
 happened more than 3 times already in a few days

 Caused by: java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


 Does this happen to anyone else?

 Thanks in advance.

 *This is the full error :*

 ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

 java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

 at
 org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

 at
 org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

 at
 org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

 at
 org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

 at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

 at
 org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

 at
 org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

 at
 scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
 at scala.Option.getOrElse(Option.scala:189)

>>>


Re: Structured Streaming Checkpoint Error

2020-12-02 Thread German Schiavon
Hello!

@Gabor Somogyi   I wonder that now that s3
is *strongly
consistent* , would work fine.


Regards!
https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/

On Thu, 17 Sep 2020 at 11:55, German Schiavon 
wrote:

> Hi Gabor,
>
> Makes sense, thanks a lot!
>
> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
> wrote:
>
>> Hi,
>>
>> Structured Streaming is simply not working when checkpoint location is on
>> S3 due to it's read-after-write consistency.
>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>
>> BR,
>> G
>>
>>
>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
>> wrote:
>>
>>> Hi!
>>>
>>> I have an Structured Streaming Application that reads from kafka,
>>> performs some aggregations and writes in S3 in parquet format.
>>>
>>> Everything seems to work great except that from time to time I get a
>>> checkpoint error, at the beginning I thought it was a random error but it
>>> happened more than 3 times already in a few days
>>>
>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>>
>>> Does this happen to anyone else?
>>>
>>> Thanks in advance.
>>>
>>> *This is the full error :*
>>>
>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>
>>> java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>
>>> at
>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>
>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>
>>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>> at scala.Option.getOrElse(Option.scala:189)
>>>
>>


Re: Structured Streaming Checkpoint Error

2020-09-17 Thread German Schiavon
Hi Gabor,

Makes sense, thanks a lot!

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
wrote:

> Hi,
>
> Structured Streaming is simply not working when checkpoint location is on
> S3 due to it's read-after-write consistency.
> Please choose an HDFS compliant filesystem and it will work like a charm.
>
> BR,
> G
>
>
> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
> wrote:
>
>> Hi!
>>
>> I have an Structured Streaming Application that reads from kafka,
>> performs some aggregations and writes in S3 in parquet format.
>>
>> Everything seems to work great except that from time to time I get a
>> checkpoint error, at the beginning I thought it was a random error but it
>> happened more than 3 times already in a few days
>>
>> Caused by: java.io.FileNotFoundException: No such file or directory:
>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>
>>
>> Does this happen to anyone else?
>>
>> Thanks in advance.
>>
>> *This is the full error :*
>>
>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>
>> java.io.FileNotFoundException: No such file or directory:
>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>
>> at
>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>
>> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>
>> at
>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>
>> at
>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>
>> at
>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>
>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>
>> at
>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>
>> at
>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>
>> at
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>
>> at
>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>
>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>> at scala.Option.getOrElse(Option.scala:189)
>>
>


Re: Structured Streaming Checkpoint Error

2020-09-17 Thread Gabor Somogyi
Hi,

Structured Streaming is simply not working when checkpoint location is on
S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
wrote:

> Hi!
>
> I have an Structured Streaming Application that reads from kafka, performs
> some aggregations and writes in S3 in parquet format.
>
> Everything seems to work great except that from time to time I get a
> checkpoint error, at the beginning I thought it was a random error but it
> happened more than 3 times already in a few days
>
> Caused by: java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
>
> Does this happen to anyone else?
>
> Thanks in advance.
>
> *This is the full error :*
>
> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>
> java.io.FileNotFoundException: No such file or directory:
> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>
> at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>
> at
> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>
> at
> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>
> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>
> at
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>
> at
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>
> at
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
> at scala.Option.getOrElse(Option.scala:189)
>


Structured Streaming Checkpoint Error

2020-09-16 Thread German Schiavon
Hi!

I have an Structured Streaming Application that reads from kafka, performs
some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a
checkpoint error, at the beginning I thought it was a random error but it
happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory:
s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else?

Thanks in advance.

*This is the full error :*

ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
14edaddf-25bb-4259-b7a2-6107907f962f, runId =
0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory:
s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at
org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at
org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at
org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at
org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at
org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)