Re: 回复:Flink HIve 文件压缩报错
目前还没办法解决,目前就只能重跑了。主要为了 exactly once 语义的。不然就会丢失这个被删除的文件的数据了。 其实如果接受非 exactly once 语义 的话,我们可以让 flink 继续跑,不过目前这个还没实现。 Best regards, Yuxia - 原始邮件 - 发件人: "迎风浪子" <576637...@qq.com.INVALID> 收件人: "user-zh" 发送时间: 星期四, 2022年 11 月 24日 下午 5:55:00 主题: 回复:Flink HIve 文件压缩报错 这个问题解决了吗?我们也发现了这种问题,无法让作业继续运行! ---原始邮件--- 发件人: "yidan zhao"
flink ????????hive??????????????????????????
hi ??MQhiveMQ??joinflink??table API??flink sql ??joinconnect ??
?????? flink hive???? owner????????
tks. ---- ??: "user-zh" https://github.com/apache/flink/pull/16745 Best regards, Yuxia - - ??: "kcz" <573693...@qq.com.INVALID ??: "user-zh"
Re: flink hive建表 owner为空问题
可以看看这个PR:https://github.com/apache/flink/pull/16745 Best regards, Yuxia - 原始邮件 - 发件人: "kcz" <573693...@qq.com.INVALID> 收件人: "user-zh" 发送时间: 星期一, 2022年 7 月 18日 下午 6:13:14 主题: flink hive建表 owner为空问题 flink-1.14.4 hive-3.1.0 当使用flink建立hive表,往hive写数据时候,查看hive的owner为空,没有根据kerberos的配置用户来设置owner。 帮忙看看如何设置owner,或者大概代码在哪里,我很需要这个功能。
flink hive???? owner????????
flink-1.14.4 hive-3.1.0 ??flinkhive??hivehive??owner??kerberosowner?? owner
Re: Flink HIve 文件压缩报错
hi,这个问题我也遇到了,不过我是因为自己删掉了文件。但是呢,我想说我删除的逻辑是按照带有success文件为标记的分区,然后删除该分区中.开头文件的。 目的在于避免部分情况下,分区中存在无用的.开头文件,导致文件系统inode爆炸。所以用定时任务进行了清理,清理逻辑就是仅仅针对success文件的分区。 但是现在发现也遇到了这个问题,而且导致了任务重启。 Rui Li 于2021年8月11日周三 下午8:57写道: > 目前没有办法让作业继续跑,只能重跑了 > > 这里有几个不同的问题: > 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 > 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 > 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) > > On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > > > 您好: > > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > > > > --Original-- > > From: "Rui Li" > Date: Wed, Aug 11, 2021 07:49 PM > > To: "user-zh" > > > Subject: Re: Flink HIve 文件压缩报错 > > > > > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > > > 目前flink这边写文件的exactly > > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > > > 您好:Flink > > > > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > > 2021-08-10 19:34:19 java.io.UncheckedIOException: > > java.io.FileNotFoundException: File does not exist: > > > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > > at > > > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > > at > > > java.util.HashMap.forEach(HashMap.java:1288) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > > at > > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > > at org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > > at > > org.apache.flink.streaming.runtime.io > > > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > > at org.apache.flink.streaming. > > runtime.io > > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > > at > > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > > at > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > > at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > > at > > java.lang.Thread.run(Thread.java:748) Caused by: > > java.io.FileNotFoundException: File does not exist: > > > > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > > at > > > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > > at > > > > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > > at > > > > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > > ... 19 more > > > > > > > > -- > > Best regards! > > Rui Li > > > > -- > Best regards! > Rui Li >
flink hive ??????????instr????????????????????????
hi! ??flink 1.13.1??hivesql, ?? col1??string: ab'cd ,??instr ' ?? instr(col1, '\'') instr(col1, ) instr(col1, '\''') instr(col1, '\\''')
Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决
Re: Flink HIve 文件压缩报错
目前没有办法让作业继续跑,只能重跑了 这里有几个不同的问题: 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进) On Wed, Aug 11, 2021 at 7:52 PM 周瑞 wrote: > 您好: > 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 > > > --Original-- > From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM > To: "user-zh" > Subject: Re: Flink HIve 文件压缩报错 > > > > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 > > 目前flink这边写文件的exactly > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 > > On Tue, Aug 10, 2021 at 7:45 PM 周瑞 > 您好:Flink > > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) > at > > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) > at > java.util.HashMap.forEach(HashMap.java:1288) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > at > org.apache.flink.streaming.runtime.io > .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at org.apache.flink.streaming. > runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) > ... 19 more > > > > -- > Best regards! > Rui Li -- Best regards! Rui Li
Re: Flink HIve 文件压缩报错
您好: 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑 --Original-- From: "Rui Li"
Re: Flink HIve 文件压缩报错
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。 目前flink这边写文件的exactly once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。 On Tue, Aug 10, 2021 at 7:45 PM 周瑞 wrote: > 您好:Flink > 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 > 2021-08-10 19:34:19 java.io.UncheckedIOException: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) >at > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) >at java.util.HashMap.forEach(HashMap.java:1288) at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.io.FileNotFoundException: File does not exist: > hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 >at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) > at > org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) > at > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) > at > org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) >... 19 more -- Best regards! Rui Li
Flink HIve 数据写入后查询无数据
您好:Flink 写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?
Flink HIve 文件压缩报错
您好:Flink 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动 2021-08-10 19:34:19 java.io.UncheckedIOException: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163) at org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173) at java.util.HashMap.forEach(HashMap.java:1288) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3 at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85) at org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161) ... 19 more
Re: flink hive批量作业报FileNotFoundException
hi: 您好,我想请问一下您这个问题您解决了吗,我们也遇到了这个问题。不过我们的环境是standalone.的。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink hive batch作业报FileNotFoundException
Hi,大家好 我执行的是batch table写入hive时,会出现FileNotFound的错误,找不到.staging文件 版本是 1.12.1 搭建方式是 standalone 报错信息如下: 11:28 Caused by: java.lang.Exception: Failed to finalize execution on master ... 33 more Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:94) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148) at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1368) ... 32 more Caused by: java.io.FileNotFoundException: File hdfs://nameservice1/user/hive/warehouse/flinkdb.db/ods_csc_zcdrpf_test_utf8/.staging_1622171066985 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:986) at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:122) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1046) at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1043) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1053) at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:137) at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:93) at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:92) ... 34 more
flink hive维表关联报错snappy压缩问题
java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1886) at org.apache.hadoop.mapred.SequenceFileRecordReader.(SequenceFileRecordReader.java:49) at org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64) at org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:113) at org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:162) at org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86) at org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:128) at org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:103) at LookupFunction$74.flatMap(Unknown Source) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82) at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$71.processElement(Unknown Source) 请问snappy的问题怎么解决?
Re: Re: flink hive Streaming查询不到数据的问题
注意时区哦,SQL层默认使用UTC的long值 On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 我把sink.partition-commit.trigger 设置成process-time 可以看到数据; > 但是我后来设置source 产生出watermark 还是不行; > val dataStream = streamEnv.addSource(new MySource) > > .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]() > .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] { > override def extractTimestamp(element: UserInfo, recordTimestamp: > Long): Long = element.getTs.getTime > })) > 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的 > > > > hdxg1101300...@163.com > > 发件人: Jingsong Li > 发送时间: 2020-10-28 16:29 > 收件人: user-zh > 主题: Re: flink hive Streaming查询不到数据的问题 > Hi, > > 你的Source看起来并没有产出watermark,所以: > > 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 > > Best, > Jingsong > > On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < > hdxg1101300...@163.com> wrote: > > > 你好: > > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > > 下面是我的代码 > > object StreamMain { > > def main(args: Array[String]): Unit = { > > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > streamEnv.setParallelism(3) > > > > val tableEnvSettings = EnvironmentSettings.newInstance() > > .useBlinkPlanner() > > .inStreamingMode() > > .build() > > > > val tableEnv = StreamTableEnvironment.create(streamEnv, > tableEnvSettings) > > > > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > > > val dataStream = streamEnv.addSource(new MySource) > > > > val catalogName = "my_catalog" > > val catalog = new HiveCatalog( > > catalogName, // catalog name > > "yutest",// default database > > > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > > "1.1.0" // Hive version > > ) > > tableEnv.registerCatalog(catalogName, catalog) > > tableEnv.useCatalog(catalogName) > > > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > > tableEnv.useDatabase("yutest") > > > > > > tableEnv.createTemporaryView("users", dataStream) > > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > > // 如果hive中已经存在了相应的表,则这段代码省略 > > val hiveSql = """CREATE external TABLE fs_table ( > > user_id STRING, > > order_amount DOUBLE > > ) > > partitioned by( > > dt string, > > h string, > > m string) stored as parquet > > TBLPROPERTIES ( > > > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > > 'sink.partition-commit.delay'='0s', > > 'sink.partition-commit.trigger'='partition-time', > > > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > > )""".stripMargin > > tableEnv.executeSql(hiveSql) > > > > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > > tableEnv.executeSql(insertSql) > > } > > } > > public class MySource implements SourceFunction { > > private volatile boolean run = true; > > String userids[] = { > > > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > > > "3ebfb9602ac07779||3ebfe9612a007979
Re: Re: flink hive Streaming查询不到数据的问题
我把sink.partition-commit.trigger 设置成process-time 可以看到数据; 但是我后来设置source 产生出watermark 还是不行; val dataStream = streamEnv.addSource(new MySource) .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]() .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] { override def extractTimestamp(element: UserInfo, recordTimestamp: Long): Long = element.getTs.getTime })) 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的 hdxg1101300...@163.com 发件人: Jingsong Li 发送时间: 2020-10-28 16:29 收件人: user-zh 主题: Re: flink hive Streaming查询不到数据的问题 Hi, 你的Source看起来并没有产出watermark,所以: 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 Best, Jingsong On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 你好: > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > 下面是我的代码 > object StreamMain { > def main(args: Array[String]): Unit = { > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > val dataStream = streamEnv.addSource(new MySource) > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "yutest",// default database > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > tableEnv.useDatabase("yutest") > > > tableEnv.createTemporaryView("users", dataStream) > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > // 如果hive中已经存在了相应的表,则这段代码省略 > val hiveSql = """CREATE external TABLE fs_table ( > user_id STRING, > order_amount DOUBLE > ) > partitioned by( > dt string, > h string, > m string) stored as parquet > TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > 'sink.partition-commit.delay'='0s', > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > )""".stripMargin > tableEnv.executeSql(hiveSql) > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > tableEnv.executeSql(insertSql) > } > } > public class MySource implements SourceFunction { > private volatile boolean run = true; > String userids[] = { > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > "3ebfb9602ac07779||3ebfe9612a007979", > "aec20d52-c2eb-4436-b121-c29ad4097f6c", > > "e7e896cd939685d7||e7e8e6c1930689d7", > "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" > }; > > @Override > > public void run(SourceFunction.SourceContext sourceContext) > throws Exception { > > while (run) { > > String userid = userids[(int) (Math.random() * (userids.length - > 1))]; > UserInfo userInfo = new UserInfo(); > userInfo.setUserId(userid); > userInfo.setAmount(Math.random() * 100); >
Re: flink hive Streaming查询不到数据的问题
Hi, 你的Source看起来并没有产出watermark,所以: 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。 Best, Jingsong On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com < hdxg1101300...@163.com> wrote: > 你好: > 我现在在使用flink 1.11.2版本 hive1.1.0 版本; > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; > 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 > 下面是我的代码 > object StreamMain { > def main(args: Array[String]): Unit = { > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamEnv.setParallelism(3) > > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, > CheckpointingMode.EXACTLY_ONCE) > > > tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, > Duration.ofSeconds(20)) > > val dataStream = streamEnv.addSource(new MySource) > > val catalogName = "my_catalog" > val catalog = new HiveCatalog( > catalogName, // catalog name > "yutest",// default database > > "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive > config (hive-site.xml) directory > "1.1.0" // Hive version > ) > tableEnv.registerCatalog(catalogName, catalog) > tableEnv.useCatalog(catalogName) > > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > tableEnv.useDatabase("yutest") > > > tableEnv.createTemporaryView("users", dataStream) > tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") > // 如果hive中已经存在了相应的表,则这段代码省略 > val hiveSql = """CREATE external TABLE fs_table ( > user_id STRING, > order_amount DOUBLE > ) > partitioned by( > dt string, > h string, > m string) stored as parquet > TBLPROPERTIES ( > > 'partition.time-extractor.timestamp-pattern'='$dt > $h:$m:00', > 'sink.partition-commit.delay'='0s', > 'sink.partition-commit.trigger'='partition-time', > > > 'sink.partition-commit.policy.kind'='metastore,success-file' > )""".stripMargin > tableEnv.executeSql(hiveSql) > > > val insertSql = "insert into fs_table SELECT userId, amount, " + " > DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') > FROM users" > tableEnv.executeSql(insertSql) > } > } > public class MySource implements SourceFunction { > private volatile boolean run = true; > String userids[] = { > > "4760858d-2bec-483c-a535-291de04b2247", > "67088699-d4f4-43f2-913c-481bff8a2dc5", > > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", > "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", > > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", > "3218bbb9-5874-4d37-a82d-3e35e52d1702", > > "3ebfb9602ac07779||3ebfe9612a007979", > "aec20d52-c2eb-4436-b121-c29ad4097f6c", > > "e7e896cd939685d7||e7e8e6c1930689d7", > "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" > }; > > @Override > > public void run(SourceFunction.SourceContext sourceContext) > throws Exception { > > while (run) { > > String userid = userids[(int) (Math.random() * (userids.length - > 1))]; > UserInfo userInfo = new UserInfo(); > userInfo.setUserId(userid); > userInfo.setAmount(Math.random() * 100); > userInfo.setTs(new Timestamp(System.currentTimeMillis())); > sourceContext.collect(userInfo); > Thread.sleep(100); > } > } > > @Override > public void cancel() { > run = false; > } > } > public class UserInfo implements Serializable { > private String userId; > private Double amount; > private Timestamp ts; > > public String getUserId() { > return userId; > } > > public void setUserId(String userId) { > this.userId = userId; > } > > public Double getAmount() { > return amount; > } > > public void setAmount(Double amount) { > this.amount = amount; > } > > public Timestamp getTs() { > return ts; > } > > public void setTs(Timestamp ts) { > this.ts = ts; > } > } > > hive (yutest)> > > > > show partitions fs_table; > OK > partition > Time taken: 20.214 seconds > > -- > hdxg1101300...@163.com > -- Best, Jingsong Lee
flink hive Streaming查询不到数据的问题
你好: 我现在在使用flink 1.11.2版本 hive1.1.0 版本; 当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据; 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行 下面是我的代码 object StreamMain { def main(args: Array[String]): Unit = { val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(3) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE) tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20)) val dataStream = streamEnv.addSource(new MySource) val catalogName = "my_catalog" val catalog = new HiveCatalog( catalogName, // catalog name "yutest",// default database "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog(catalogName, catalog) tableEnv.useCatalog(catalogName) tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tableEnv.useDatabase("yutest") tableEnv.createTemporaryView("users", dataStream) tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ") // 如果hive中已经存在了相应的表,则这段代码省略 val hiveSql = """CREATE external TABLE fs_table ( user_id STRING, order_amount DOUBLE ) partitioned by( dt string, h string, m string) stored as parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00', 'sink.partition-commit.delay'='0s', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.policy.kind'='metastore,success-file' )""".stripMargin tableEnv.executeSql(hiveSql) val insertSql = "insert into fs_table SELECT userId, amount, " + " DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users" tableEnv.executeSql(insertSql) } } public class MySource implements SourceFunction { private volatile boolean run = true; String userids[] = { "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5", "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b", "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702", "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c", "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee" }; @Override public void run(SourceFunction.SourceContext sourceContext) throws Exception { while (run) { String userid = userids[(int) (Math.random() * (userids.length - 1))]; UserInfo userInfo = new UserInfo(); userInfo.setUserId(userid); userInfo.setAmount(Math.random() * 100); userInfo.setTs(new Timestamp(System.currentTimeMillis())); sourceContext.collect(userInfo); Thread.sleep(100); } } @Override public void cancel() { run = false; } } public class UserInfo implements Serializable { private String userId; private Double amount; private Timestamp ts; public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } public Timestamp getTs() { return ts; } public void setTs(Timestamp ts) { this.ts = ts; } } hive (yutest)> > > show partitions fs_table; OK partition Time taken: 20.214 seconds hdxg1101300...@163.com
Re: flink hive批量作业报FileNotFoundException
Hello, 作业的逻辑大概是啥样的,我去试试 On Thu, Sep 17, 2020 at 10:00 PM godfrey he wrote: > cc @Rui Li > > 李佳宸 于2020年9月14日周一 下午5:11写道: > >> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 >> 版本是1.11.1 >> Caused by: java.io.FileNotFoundException: File >> >> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 >> does not exist. >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120) >> ~[hadoop-client-api-3.1.3.jar:?] >> at >> >> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) >> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0] >> at >> >> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) >> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) >> ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> at >> >> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) >> ~[flink-dist_2.11-1.11.1.jar:1.11.1] >> >> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题 >> > -- Best regards! Rui Li
Re: flink hive批量作业报FileNotFoundException
cc @Rui Li 李佳宸 于2020年9月14日周一 下午5:11写道: > 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 > 版本是1.11.1 > Caused by: java.io.FileNotFoundException: File > > hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 > does not exist. > at > > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120) > ~[hadoop-client-api-3.1.3.jar:?] > at > > org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) > ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0] > at > > org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) > ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题 >
flink hive批量作业报FileNotFoundException
大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 版本是1.11.1 Caused by: java.io.FileNotFoundException: File hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120) ~[hadoop-client-api-3.1.3.jar:?] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157) ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95) ~[flink-table-blink_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
Flink ????hive?? ?????? Savepoint
??flink 1.10 hive??SavepointTableDataSet EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); Table table = tableEnv.sqlQuery("select * from test001"); BootstrapTransformation
?????? Flink??????????Hive??????????????????
https://github.com/hiliuxg/flink-orc-sink hdfs orcexectly once ---- ??: "jingwen jingwen"https://issues.apache.org/jira/browse/FLINK-14249 Best, Jingsong Lee -- From:
Re: [PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs
Thanks, Shaoxuan! I've sent a Chinese version to user-zh at the same time yesterday. >From feedbacks we received so far, supporting multiple older hive versions is definitely one of our focuses next. *More feedbacks are welcome from our community!* On Tue, Mar 19, 2019 at 8:44 PM Shaoxuan Wang wrote: > Hi Bowen, > Thanks for driving this. I am CCing this email/survey to user-zh@ > flink.apache.org as well. > I heard there are lots of interests on Flink-Hive from the field. One of > the biggest requests the hive users are raised is "the support of > out-of-date hive version". A large amount of users are still working on the > cluster with CDH/HDP installed with old hive version, say 1.2.1/2.1.1. We > need ensure the support of these Hive version when planning the work on > Flink-Hive integration. > > *@all. "We want to get your feedbacks on Flink-Hive integration." * > > Regards, > Shaoxuan > > On Wed, Mar 20, 2019 at 7:16 AM Bowen Li wrote: > >> Hi Flink users and devs, >> >> We want to get your feedbacks on integrating Flink with Hive. >> >> Background: In Flink Forward in Beijing last December, the community >> announced to initiate efforts on integrating Flink and Hive. On Feb 21 >> Seattle >> Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>, >> We presented Integrating Flink with Hive >> <https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019> >> with >> a live demo to local community and got great response. As of mid March now, >> we have internally finished building Flink's brand-new catalog >> infrastructure, metadata integration with Hive, and most common cases of >> Flink reading/writing against Hive, and will start to submit more design >> docs/FLIP and contribute code back to community. The reason for doing it >> internally first and then in community is to ensure our proposed solutions >> are fully validated and tested, gain hands-on experience and not miss >> anything in design. You are very welcome to join this effort, from >> design/code review, to development and testing. >> >> *The most important thing we believe you, our Flink users/devs, can help >> RIGHT NOW is to share your Hive use cases and give us feedbacks for this >> project. As we start to go deeper on specific areas of integration, you >> feedbacks and suggestions will help us to refine our backlogs and >> prioritize our work, and you can get the features you want sooner! *Just >> for example, if most users is mainly only reading Hive data, then we can >> prioritize tuning read performance over implementing write capability. >> A quick review of what we've finished building internally and is ready to >> contribute back to community: >> >>- Flink/Hive Metadata Integration >> - Unified, pluggable catalog infra that manages meta-objects, >> including catalogs, databases, tables, views, functions, partitions, >> table/partition stats >> - Three catalog impls - A in-memory catalog, HiveCatalog for >> embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting >> Flink's streaming/batch metadata in Hive metastore >> - Hierarchical metadata reference as >> .. in SQL and Table API >> - Unified function catalog based on new catalog infra, also >> support Hive simple UDF >>- Flink/Hive Data Integration >> - Hive data connector that reads partitioned/non-partitioned Hive >> tables, and supports partition pruning, both Hive simple and complex >> data >> types, and basic write >>- More powerful SQL Client fully integrated with the above features >>and more Hive-compatible SQL syntax for better end-to-end SQL experience >> >> *Given above info, we want to learn from you on: How do you use Hive >> currently? How can we solve your pain points? What features do you expect >> from Flink-Hive integration? Those can be details like:* >> >>- *Which Hive version are you using? Do you plan to upgrade Hive?* >>- *Are you planning to switch Hive engine? What timeline are you >>looking at? Until what capabilities Flink has will you consider using >> Flink >>with Hive?* >>- *What's your motivation to try Flink-Hive? Maintain only one data >>processing system across your teams for simplicity and maintainability? >>Better performance of Flink over Hive itself?* >>- *What are your Hive use cases? How large is your Hive data size? Do >>you mainly do reading, or
Re: [PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs
Hi Bowen, Thanks for driving this. I am CCing this email/survey to user-zh@ flink.apache.org as well. I heard there are lots of interests on Flink-Hive from the field. One of the biggest requests the hive users are raised is "the support of out-of-date hive version". A large amount of users are still working on the cluster with CDH/HDP installed with old hive version, say 1.2.1/2.1.1. We need ensure the support of these Hive version when planning the work on Flink-Hive integration. *@all. "We want to get your feedbacks on Flink-Hive integration." * Regards, Shaoxuan On Wed, Mar 20, 2019 at 7:16 AM Bowen Li wrote: > Hi Flink users and devs, > > We want to get your feedbacks on integrating Flink with Hive. > > Background: In Flink Forward in Beijing last December, the community > announced to initiate efforts on integrating Flink and Hive. On Feb 21 Seattle > Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>, We > presented Integrating Flink with Hive > <https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019> > with > a live demo to local community and got great response. As of mid March now, > we have internally finished building Flink's brand-new catalog > infrastructure, metadata integration with Hive, and most common cases of > Flink reading/writing against Hive, and will start to submit more design > docs/FLIP and contribute code back to community. The reason for doing it > internally first and then in community is to ensure our proposed solutions > are fully validated and tested, gain hands-on experience and not miss > anything in design. You are very welcome to join this effort, from > design/code review, to development and testing. > > *The most important thing we believe you, our Flink users/devs, can help > RIGHT NOW is to share your Hive use cases and give us feedbacks for this > project. As we start to go deeper on specific areas of integration, you > feedbacks and suggestions will help us to refine our backlogs and > prioritize our work, and you can get the features you want sooner! *Just > for example, if most users is mainly only reading Hive data, then we can > prioritize tuning read performance over implementing write capability. > A quick review of what we've finished building internally and is ready to > contribute back to community: > >- Flink/Hive Metadata Integration > - Unified, pluggable catalog infra that manages meta-objects, > including catalogs, databases, tables, views, functions, partitions, > table/partition stats > - Three catalog impls - A in-memory catalog, HiveCatalog for > embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting > Flink's streaming/batch metadata in Hive metastore > - Hierarchical metadata reference as > .. in SQL and Table API > - Unified function catalog based on new catalog infra, also support > Hive simple UDF >- Flink/Hive Data Integration > - Hive data connector that reads partitioned/non-partitioned Hive > tables, and supports partition pruning, both Hive simple and complex > data > types, and basic write >- More powerful SQL Client fully integrated with the above features >and more Hive-compatible SQL syntax for better end-to-end SQL experience > > *Given above info, we want to learn from you on: How do you use Hive > currently? How can we solve your pain points? What features do you expect > from Flink-Hive integration? Those can be details like:* > >- *Which Hive version are you using? Do you plan to upgrade Hive?* >- *Are you planning to switch Hive engine? What timeline are you >looking at? Until what capabilities Flink has will you consider using Flink >with Hive?* >- *What's your motivation to try Flink-Hive? Maintain only one data >processing system across your teams for simplicity and maintainability? >Better performance of Flink over Hive itself?* >- *What are your Hive use cases? How large is your Hive data size? Do >you mainly do reading, or both reading and writing?* >- *How many Hive user defined functions do you have? Are they mostly >UDF, GenericUDF, or UDTF, or UDAF?* >- any questions or suggestions you have? or as simple as how you feel >about the project > > Again, your input will be really valuable to us, and we hope, with all of > us working together, the project can benefits our end users. Please feel > free to either reply to this thread or just to me. I'm also working on > creating a questionnaire to better gather your feedbacks, watch for the > maillist in the next couple days. > > Thanks, > Bowen > > > > >