Re: 回复:Flink HIve 文件压缩报错
目前还没办法解决,目前就只能重跑了。主要为了 exactly once 语义的。不然就会丢失这个被删除的文件的数据了。 其实如果接受非 exactly once 语义 的话,我们可以让 flink 继续跑,不过目前这个还没实现。 Best regards, Yuxia - 原始邮件 - 发件人: "迎风浪子" <576637...@qq.com.INVALID> 收件人: "user-zh" 发送时间: 星期四, 2022年 11 月 24日 下午 5:55:00 主题: 回复:Flink HIve 文件压缩报错 这个问题解决了吗?我们也发现了这种问题,无法让作业继续运行! ---原始邮件--- 发件人: "yidan zhao"
Re: Flink HIve 文件压缩报错
hi,这个问题我也遇到了,不过我是因为自己删掉了文件。但是呢,我想说我删除的逻辑是按照带有success文件为标记的分区,然后删除该分区中.开头文件的。 目的在于避免部分情况下,分区中存在无用的.开头文件,导致文件系统inode爆炸。所以用定时任务进行了清理,清理逻辑就是仅仅针对success文件的分区。 但是现在发现也遇到了这个问题,而且导致了任务重启。 Rui Li 于2021年8月11日周三 下午8:57写道: > 目前没有办法让作业继续跑,只能重跑了 > > 这里有几个不同的问题: > 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 > 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 > 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) > > On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > > > 您好: > > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > > > > --Original-- > > From: "Rui Li" > Date: Wed, Aug 11, 2021 07:49 PM > > To: "user-zh" > > > Subject: Re: Flink HIve 文件压缩报错 > > > > > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > > > 目前flink这边写文件的exactly > > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > > > 您好:Flink > > > > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > > 2021-08-10 19:34:19 java.io.UncheckedIOException: > > java.io.FileNotFoundException: File does not exist: > > > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > > at > > > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > > at > > > java.util.HashMap.forEach(HashMap.java:1288) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > > at > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > > at org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > at > > org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > at org.apache.flink.streaming. > > runtime.io > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > > at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at > > java.lang.Thread.run(Thread.java:748) Caused by: > > java.io.FileNotFoundException: File does not exist: > > > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > > at > > > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > > at > > > > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > > ... 19 more > > > > > > > > -- > > Best regards! > > Rui Li > > > > -- > Best regards! > Rui Li >
Re: Flink HIve 文件压缩报错
目前没有办法让作业继续跑,只能重跑了 这里有几个不同的问题: 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > 您好: > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > --Original-- > From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM > To: "user-zh" > Subject: Re: Flink HIve 文件压缩报错 > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > 目前flink这边写文件的exactly > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > 您好:Flink > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > at > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > at > java.util.HashMap.forEach(HashMap.java:1288) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming. > runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > ... 19 more > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: Flink HIve 文件压缩报错
您好: 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 --Original-- From: "Rui Li"
Re: Flink HIve 文件压缩报错
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 目前flink这边写文件的exactly once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 On Tue, Aug 10, 2021 at 7:45 PM 周瑞 wrote: > 您好:Flink > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) >at > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) >at java.util.HashMap.forEach(HashMap.java:1288) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 >at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) >... 19 more -- Best regards! Rui Li
Flink HIve 文件压缩报错
您好:Flink 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 2021-08-10 19:34:19 java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) at org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) at java.util.HashMap.forEach(HashMap.java:1288) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) ... 19 more