Marc Aurel Fritz created FLINK-36421:
----------------------------------------

             Summary: Missing fsync in FsCheckpointStreamFactory
                 Key: FLINK-36421
                 URL: https://issues.apache.org/jira/browse/FLINK-36421
             Project: Flink
          Issue Type: Bug
          Components: FileSystems, Runtime / Checkpointing
    Affects Versions: 1.20.0, 1.19.0, 1.18.0, 1.17.0
            Reporter: Marc Aurel Fritz
         Attachments: fsync-fs-stream-factory.diff

With Flink 1.20 we observed another checkpoint corruption bug. This is similar 
to FLINK-35217, but affects only files written by the taskmanager (the ones 
with random names as described 
[here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]).

After system crash the files written by the taskmanager may be corrupted (file 
size of 0 bytes) if the changes in the file-system cache haven't been written 
to disk. The "_metadata" file written by the jobmanager is always fine because 
it's properly fsynced.

Investigation revealed that "fsync" is missing, this time in 
"FsCheckpointStreamFactory". In this case the "OutputStream" is closed without 
calling "fsync", thus the file is not durably persisted on disk before the 
checkpoint is completed. (As previously established in FLINK-35217, calling 
"fsync" is necessary as simply closing the stream does not have any guarantees 
on persistence.)

"strace" on the taskmanager's process confirms this behavior:
 # The checkpoint chk-1217's directory is created at "mkdir"
 # The checkpoint chk-1217's non-inline state is written by the taskmanager at 
"openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that there's 
no "fsync" before "close".
 # The checkpoint chk-1217 is finished, its "_metadata" is written and synced 
properly
 # The old checkpoint chk-1216 is deleted at "unlink"
The new checkpoint chk-1217 now references a not-synced file that can get 
corrupted on e.g. power loss. This means there is no working checkpoint left as 
the old checkpoint was deleted.

For durable persistence an "fsync" call is missing before "close" in step 2.
Full "strace" log:
{code:java}
[pid 947250] 08:22:58 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
0x7f68414c5b50) = -1 ENOENT (No such file or directory)
[pid 947250] 08:22:58 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
0x7f68414c5b50) = -1 ENOENT (No such file or directory)
[pid 947250] 08:22:58 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:58 
mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 0777) 
= 0
[pid 1303248] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
 0x7f56f08d5610) = -1 ENOENT (No such file or directory)
[pid 1303248] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 1303248] 08:22:59 openat(AT_FDCWD, 
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
 O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199
[pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
[pid 1303248] 08:22:59 close(199)       = 0
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
 0x7f683fb378b0) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
 0x7f683fb37730) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
 0x7f683fb37730) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947310] 08:22:59 openat(AT_FDCWD, 
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
 O_WRONLY|O_CREAT|O_EXCL, 0666) = 148
[pid 947310] 08:22:59 fsync(148)        = 0
[pid 947310] 08:22:59 close(148)        = 0
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
 {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0
[pid 947310] 08:22:59 
rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
 "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") = 0
[pid 947310] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe",
  <unfinished ...>
[pid 947250] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata",
  <unfinished ...>
[pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, 
...}) = 0
[pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, 
...}) = 0
[pid 947310] 08:22:59 
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe"
 <unfinished ...>
[pid 947250] 08:22:59 
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata"
 <unfinished ...>
[pid 947310] 08:22:59 <... unlink resumed>) = 0
[pid 947250] 08:22:59 <... unlink resumed>) = 0
[pid 947250] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:59 openat(AT_FDCWD, 
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148
[pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, st_size=4096, 
...}, AT_EMPTY_PATH) = 0
[pid 947250] 08:22:59 close(148)        = 0
[pid 947250] 08:22:59 
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",  
<unfinished ...>
[pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such 
file or directory)
[pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, 
...}) = 0
[pid 947250] 08:22:59 openat(AT_FDCWD, 
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", 
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...>
[pid 947250] 08:22:59 <... openat resumed>) = 148
[pid 947250] 08:22:59 newfstatat(148, "",  <unfinished ...>
[pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, 
st_size=4096, ...}, AT_EMPTY_PATH) = 0
[pid 947250] 08:22:59 close(148)        = 0
[pid 947250] 08:22:59 
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = -1 
EISDIR (Is a directory)
[pid 947250] 08:22:59 
rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
{code}
Calling "sync()" when closing the stream in 
"FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing the 
serialized state files before returning their reference to the jobmanager. The 
following commit fixes this: 
[https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758]
Diff is also attached.

"strace" confirms that "fsync" is now called before the taskmanager's state 
file is closed, see line 9:
{code:java}
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
0x7f2c167fc890) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
0x7f2c167fc890) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", 
{st_mode=S_IFDIR|0755, st_size=44, ...}) = 0
[pid 108807] 13:14:59 
mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) = 0
[pid 110456] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
 0x7f7d56efbe90) = -1 ENOENT (No such file or directory)
[pid 110456] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 110456] 13:14:59 openat(AT_FDCWD, 
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
 O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268
[pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
[pid 110456] 13:14:59 fsync(268)        = 0
[pid 110456] 13:14:59 close(268)        = 0
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
 0x7f2c167fc710) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
  <unfinished ...>
[pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such 
file or directory)
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
 0x7f2c167fc670) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
{st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 
{st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
[pid 108807] 13:14:59 openat(AT_FDCWD, 
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
 O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
[pid 108807] 13:14:59 fsync(168)        = 0
[pid 108807] 13:14:59 close(168)        = 0
[pid 108807] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
 {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0
[pid 108807] 13:14:59 
rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
 "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") = 0
[pid 108823] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata",
 {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0
[pid 108823] 13:14:59 
unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata")
 = 0
[pid 108823] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59 openat(AT_FDCWD, 
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
[pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
...}, AT_EMPTY_PATH) = 0
[pid 108823] 13:14:59 close(168)        = 0
[pid 108823] 13:14:59 
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59 openat(AT_FDCWD, 
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", 
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
[pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, 
...}, AT_EMPTY_PATH) = 0
[pid 108823] 13:14:59 close(168)        = 0
[pid 108823] 13:14:59 
unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 
EISDIR (Is a directory)
[pid 108823] 13:14:59 
rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
{code}
Presumably only checkpoints with larger state sizes are affected as small state 
is inlined into the "_metadata" file, which is properly persisted since flink 
1.19.1 due to FLINK-35217.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to