Hi Ufuk,

It seems I messed it up a bit :)
I cannot comment on jira, since it's temporarily locked...

1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty - this seems to be expected behaviour, as AbstractFileStateHandle.discardState():

// send a call to delete the checkpoint directory containing the file. This will
// fail (and be ignored) when some files still exist
try {
   getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {}

- so this is working as expected, although it causes a lot of garbage in hdfs logs...

2. The problem with not discarded checkpoints seems to be related to periods when we don't have any traffic (during night).
At that point many checkpoints "expire before completing":
2016-05-13 00:00:10,585 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 199 @ 1463090410585 2016-05-13 00:10:10,585 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 199 expired before completing. 2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300] WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received late message for now expired checkpoint attempt 199

When checkpoint manage to complete they take v. long to do so:
2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 201 (in 308472 ms)

- this is happening when no new messages arrive (we have simple process like kafka->keyBy->custom state aggregation->kafka, with EventTime time characteristic) I think I messed sth up with eventTime & generating watermarks - I'll have to check it. With RocksDB I made checkpoints at much larger intervals, so probably that's why I haven't noticed the disk is getting full.
OTOH - shouldn't expired checkpoints be cleaned up automatically?


Sorry for confustion and thanks for help

thanks,
maciek


On 12/05/2016 21:28, Maciek Próchniak wrote:
thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <u...@apache.org> wrote:
Hey Maciek,

thanks for reporting this. Having files linger around looks like a bug to me.

The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and then fix it.

– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <m...@touk.pl> wrote:
Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded properly.
In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty
         at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
         at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)

While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 64 @ 1462875682636

I see in the code that delete operations in flink are done with recursive flag set to false - but I'm not sure why the contents are not being deleted
before?
When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek






Reply via email to