Marc Aurel Fritz created FLINK-36421:
----------------------------------------
Summary: Missing fsync in FsCheckpointStreamFactory
Key: FLINK-36421
URL: https://issues.apache.org/jira/browse/FLINK-36421
Project: Flink
Issue Type: Bug
Components: FileSystems, Runtime / Checkpointing
Affects Versions: 1.20.0, 1.19.0, 1.18.0, 1.17.0
Reporter: Marc Aurel Fritz
Attachments: fsync-fs-stream-factory.diff
With Flink 1.20 we observed another checkpoint corruption bug. This is similar
to FLINK-35217, but affects only files written by the taskmanager (the ones
with random names as described
[here|https://github.com/Planet-X/flink/blob/0d6e25a9738d9d4ee94de94e1437f92611b50758/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java#L47]).
After system crash the files written by the taskmanager may be corrupted (file
size of 0 bytes) if the changes in the file-system cache haven't been written
to disk. The "_metadata" file written by the jobmanager is always fine because
it's properly fsynced.
Investigation revealed that "fsync" is missing, this time in
"FsCheckpointStreamFactory". In this case the "OutputStream" is closed without
calling "fsync", thus the file is not durably persisted on disk before the
checkpoint is completed. (As previously established in FLINK-35217, calling
"fsync" is necessary as simply closing the stream does not have any guarantees
on persistence.)
"strace" on the taskmanager's process confirms this behavior:
# The checkpoint chk-1217's directory is created at "mkdir"
# The checkpoint chk-1217's non-inline state is written by the taskmanager at
"openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that there's
no "fsync" before "close".
# The checkpoint chk-1217 is finished, its "_metadata" is written and synced
properly
# The old checkpoint chk-1216 is deleted at "unlink"
The new checkpoint chk-1217 now references a not-synced file that can get
corrupted on e.g. power loss. This means there is no working checkpoint left as
the old checkpoint was deleted.
For durable persistence an "fsync" call is missing before "close" in step 2.
Full "strace" log:
{code:java}
[pid 947250] 08:22:58
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
0x7f68414c5b50) = -1 ENOENT (No such file or directory)
[pid 947250] 08:22:58
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
0x7f68414c5b50) = -1 ENOENT (No such file or directory)
[pid 947250] 08:22:58
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:58
mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 0777)
= 0
[pid 1303248] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
0x7f56f08d5610) = -1 ENOENT (No such file or directory)
[pid 1303248] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 1303248] 08:22:59 openat(AT_FDCWD,
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc",
O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199
[pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
[pid 1303248] 08:22:59 close(199) = 0
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
0x7f683fb378b0) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata",
0x7f683fb37730) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
0x7f683fb37730) = -1 ENOENT (No such file or directory)
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947310] 08:22:59 openat(AT_FDCWD,
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
O_WRONLY|O_CREAT|O_EXCL, 0666) = 148
[pid 947310] 08:22:59 fsync(148) = 0
[pid 947310] 08:22:59 close(148) = 0
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
{st_mode=S_IFREG|0644, st_size=46265, ...}) = 0
[pid 947310] 08:22:59
rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0",
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") = 0
[pid 947310] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe",
<unfinished ...>
[pid 947250] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata",
<unfinished ...>
[pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409,
...}) = 0
[pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265,
...}) = 0
[pid 947310] 08:22:59
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe"
<unfinished ...>
[pid 947250] 08:22:59
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata"
<unfinished ...>
[pid 947310] 08:22:59 <... unlink resumed>) = 0
[pid 947250] 08:22:59 <... unlink resumed>) = 0
[pid 947250] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
[pid 947250] 08:22:59 openat(AT_FDCWD,
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148
[pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, st_size=4096,
...}, AT_EMPTY_PATH) = 0
[pid 947250] 08:22:59 close(148) = 0
[pid 947250] 08:22:59
stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
<unfinished ...>
[pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such
file or directory)
[pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096,
...}) = 0
[pid 947250] 08:22:59 openat(AT_FDCWD,
"/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216",
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...>
[pid 947250] 08:22:59 <... openat resumed>) = 148
[pid 947250] 08:22:59 newfstatat(148, "", <unfinished ...>
[pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755,
st_size=4096, ...}, AT_EMPTY_PATH) = 0
[pid 947250] 08:22:59 close(148) = 0
[pid 947250] 08:22:59
unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = -1
EISDIR (Is a directory)
[pid 947250] 08:22:59
rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
{code}
Calling "sync()" when closing the stream in
"FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing the
serialized state files before returning their reference to the jobmanager. The
following commit fixes this:
[https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758]
Diff is also attached.
"strace" confirms that "fsync" is now called before the taskmanager's state
file is closed, see line 9:
{code:java}
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
0x7f2c167fc890) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
0x7f2c167fc890) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc",
{st_mode=S_IFDIR|0755, st_size=44, ...}) = 0
[pid 108807] 13:14:59
mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) = 0
[pid 110456] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
0x7f7d56efbe90) = -1 ENOENT (No such file or directory)
[pid 110456] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 110456] 13:14:59 openat(AT_FDCWD,
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222",
O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268
[pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0
[pid 110456] 13:14:59 fsync(268) = 0
[pid 110456] 13:14:59 close(268) = 0
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
0x7f2c167fc710) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata",
<unfinished ...>
[pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such
file or directory)
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
0x7f2c167fc670) = -1 ENOENT (No such file or directory)
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
{st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263",
{st_mode=S_IFDIR|0755, st_size=72, ...}) = 0
[pid 108807] 13:14:59 openat(AT_FDCWD,
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
O_WRONLY|O_CREAT|O_EXCL, 0666) = 168
[pid 108807] 13:14:59 fsync(168) = 0
[pid 108807] 13:14:59 close(168) = 0
[pid 108807] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
{st_mode=S_IFREG|0644, st_size=21416, ...}) = 0
[pid 108807] 13:14:59
rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396",
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") = 0
[pid 108823] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata",
{st_mode=S_IFREG|0644, st_size=36684, ...}) = 0
[pid 108823] 13:14:59
unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata")
= 0
[pid 108823] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59 openat(AT_FDCWD,
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
[pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0,
...}, AT_EMPTY_PATH) = 0
[pid 108823] 13:14:59 close(168) = 0
[pid 108823] 13:14:59
stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
{st_mode=S_IFDIR|0755, st_size=0, ...}) = 0
[pid 108823] 13:14:59 openat(AT_FDCWD,
"/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262",
O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168
[pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0,
...}, AT_EMPTY_PATH) = 0
[pid 108823] 13:14:59 close(168) = 0
[pid 108823] 13:14:59
unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1
EISDIR (Is a directory)
[pid 108823] 13:14:59
rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
{code}
Presumably only checkpoints with larger state sizes are affected as small state
is inlined into the "_metadata" file, which is properly persisted since flink
1.19.1 due to FLINK-35217.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)