thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:
The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi <u...@apache.org> wrote:
Hey Maciek,

thanks for reporting this. Having files linger around looks like a bug to me.

The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and then fix it.

– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak <m...@touk.pl> wrote:
Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded properly.
In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates:
blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 10.10.113.9:49233
Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty
         at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
         at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)

While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 64 @ 1462875682636

I see in the code that delete operations in flink are done with recursive
flag set to false - but I'm not sure why the contents are not being deleted
before?
When we were using RocksDB backed we didn't encounter such situation.
we're using flink 1.0.1 and hdfs 2.7.2.

Do anybody has any idea why this could be happening?

thanks,
maciek




Reply via email to