Re: 回复:Flink HIve 文件压缩报错

2022-11-24 文章 yuxia
目前还没办法解决,目前就只能重跑了。主要为了 exactly once 语义的。不然就会丢失这个被删除的文件的数据了。

其实如果接受非 exactly once 语义 的话,我们可以让 flink 继续跑,不过目前这个还没实现。

Best regards,
Yuxia

- 原始邮件 -
发件人: "迎风浪子" <576637...@qq.com.INVALID>
收件人: "user-zh" 
发送时间: 星期四, 2022年 11 月 24日 下午 5:55:00
主题: 回复:Flink HIve 文件压缩报错

这个问题解决了吗?我们也发现了这种问题,无法让作业继续运行!



---原始邮件---
发件人: "yidan zhao"

flink ????????hive??????????????????????????

2022-09-02 文章 ??????
hi
  
??MQhiveMQ??joinflink??table
 API??flink sql ??joinconnect 

  ??









?????? flink hive???? owner????????

2022-07-18 文章 kcz
tks.




----
??: 
   "user-zh"

https://github.com/apache/flink/pull/16745

Best regards,
Yuxia

-  -
??: "kcz" <573693...@qq.com.INVALID
??: "user-zh" 

Re: flink hive建表 owner为空问题

2022-07-18 文章 yuxia
可以看看这个PR:https://github.com/apache/flink/pull/16745

Best regards,
Yuxia

- 原始邮件 -
发件人: "kcz" <573693...@qq.com.INVALID>
收件人: "user-zh" 
发送时间: 星期一, 2022年 7 月 18日 下午 6:13:14
主题: flink hive建表 owner为空问题

flink-1.14.4
hive-3.1.0
当使用flink建立hive表,往hive写数据时候,查看hive的owner为空,没有根据kerberos的配置用户来设置owner。
帮忙看看如何设置owner,或者大概代码在哪里,我很需要这个功能。


flink hive???? owner????????

2022-07-18 文章 kcz
flink-1.14.4
hive-3.1.0
??flinkhive??hivehive??owner??kerberosowner??
owner

Re: Flink HIve 文件压缩报错

2021-11-24 文章 yidan zhao
hi,这个问题我也遇到了,不过我是因为自己删掉了文件。但是呢,我想说我删除的逻辑是按照带有success文件为标记的分区,然后删除该分区中.开头文件的。
目的在于避免部分情况下,分区中存在无用的.开头文件,导致文件系统inode爆炸。所以用定时任务进行了清理,清理逻辑就是仅仅针对success文件的分区。
但是现在发现也遇到了这个问题,而且导致了任务重启。

Rui Li  于2021年8月11日周三 下午8:57写道:

> 目前没有办法让作业继续跑,只能重跑了
>
> 这里有几个不同的问题:
> 1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉
> 2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受
> 3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进)
>
> On Wed, Aug 11, 2021 at 7:52 PM 周瑞  wrote:
>
> > 您好:
> >  这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
> > 
> > 
> > --Original--
> > From: "Rui Li" > Date: Wed, Aug 11, 2021 07:49 PM
> > To: "user-zh" >
> > Subject: Re: Flink HIve 文件压缩报错
> >
> > 
> >
> > 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。
> >
> > 目前flink这边写文件的exactly
> > once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。
> >
> > On Tue, Aug 10, 2021 at 7:45 PM 周瑞  >
> >  您好:Flink
> > 
> >
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> >  2021-08-10 19:34:19 java.io.UncheckedIOException:
> >  java.io.FileNotFoundException: File does not exist:
> > 
> >
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> >  at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
> >  at
> > 
> > org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
> >  at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
> >  at
> >
> java.util.HashMap.forEach(HashMap.java:1288)
> > at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> >  at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
> >  at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> >  at org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> >  at
> > org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> >  at org.apache.flink.streaming.
> > runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> >  at
> > 
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> >  at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> >  at
> >
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> > at
> >  java.lang.Thread.run(Thread.java:748) Caused by:
> >  java.io.FileNotFoundException: File does not exist:
> > 
> >
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
> >  at
> > 
> >
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
> >  at
> > 
> >
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
> >  at
> > 
> >
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> >  at
> > 
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> >  at
> > 
> >
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
> >  at
> > 
> >
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
> >  ... 19 more
> >
> >
> >
> > --
> > Best regards!
> > Rui Li
>
>
>
> --
> Best regards!
> Rui Li
>


flink hive ??????????instr????????????????????????

2021-09-01 文章 Asahi Lee
hi!  ??flink 1.13.1??hivesql, ?? 
col1??string: ab'cd ,??instr ' 
??
   instr(col1, '\'')
   instr(col1, )
   instr(col1, '\''')
   instr(col1, '\\''')


Flink HIve 文件,上游Kafak数据很大的时候,无法完成checkpoint

2021-08-11 文章 周瑞
您好,Flink Hive 当上游的Kafka数据量特别大的时候,发现checkpoint一直无法完成,5分钟后报错了。请问这个问题要怎么解决

Re: Flink HIve 文件压缩报错

2021-08-11 文章 Rui Li
目前没有办法让作业继续跑,只能重跑了

这里有几个不同的问题:
1. 文件是如何被删掉的,两种可能:被Flink误删(需要在Flink这边修复)、或者被Flink之外的进程删掉
2. 文件丢失意味着exactly once语义无法保证了,这种情况是否可以接受
3. 如果可以接受丢失数据,如何让Flink作业可以继续跑(需要Flink的改进)

On Wed, Aug 11, 2021 at 7:52 PM 周瑞  wrote:

> 您好:
>  这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑
> 
> 
> --Original--
> From: "Rui Li" Date: Wed, Aug 11, 2021 07:49 PM
> To: "user-zh"
> Subject: Re: Flink HIve 文件压缩报错
>
> 
>
> 这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。
>
> 目前flink这边写文件的exactly
> once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。
>
> On Tue, Aug 10, 2021 at 7:45 PM 周瑞 
>  您好:Flink
> 
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
>  2021-08-10 19:34:19 java.io.UncheckedIOException:
>  java.io.FileNotFoundException: File does not exist:
> 
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>  at
> 
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
>  at
> java.util.HashMap.forEach(HashMap.java:1288)
> at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>  at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  at
> org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at org.apache.flink.streaming.
> runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at
>  java.lang.Thread.run(Thread.java:748) Caused by:
>  java.io.FileNotFoundException: File does not exist:
> 
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
>  at
> 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  at
> 
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
>  at
> 
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>  at
> 
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
>  ... 19 more
>
>
>
> --
> Best regards!
> Rui Li



-- 
Best regards!
Rui Li


Re: Flink HIve 文件压缩报错

2021-08-11 文章 周瑞
您好:
 这个文件确实不存在了,这种情况目前怎样设置可以让作业继续跑


--Original--
From: "Rui Li"

Re: Flink HIve 文件压缩报错

2021-08-11 文章 Rui Li
这个文件是确实不在了么?是不是被别的进程删掉了呢,可以通过hdfs的audit log来判断一下。

目前flink这边写文件的exactly
once语义是依赖HDFS的一致性保证的,如果之前写到HDFS的数据丢掉了就会破坏这个语义了(不过我们可以考虑在这种情况下让作业能继续跑)。

On Tue, Aug 10, 2021 at 7:45 PM 周瑞  wrote:

> 您好:Flink
> 写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
> 2021-08-10 19:34:19 java.io.UncheckedIOException:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>  at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
>at
> org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
>at java.util.HashMap.forEach(HashMap.java:1288) at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
> at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>   at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at
> java.lang.Thread.run(Thread.java:748) Caused by:
> java.io.FileNotFoundException: File does not exist:
> hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
>at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
> at
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
>   at
> org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
>... 19 more



-- 
Best regards!
Rui Li


Flink HIve 数据写入后查询无数据

2021-08-11 文章 周瑞
您好:Flink 
写入Hive的时候数据已经写进去了,但是中间发生了异常导致这里的文件没有compact,Hive数据表查不出数据,后续的Flink程序启动后由于数据不再写入这个分区。导致该分区的数据一直无法compact.请问这种异常需要怎么解决,有手动修复的方法么?

Flink HIve 文件压缩报错

2021-08-10 文章 周瑞
您好:Flink 
写入Hive的时候,在压缩文件的时候有个待压缩的文件丢失了,导致Flink程序一直在不断重启,请问文件丢失是什么原因导致的,这种情况怎么能够让Flink程序正常启动
2021-08-10 19:34:19 java.io.UncheckedIOException: 
java.io.FileNotFoundException: File does not exist: 
hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
   at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
 at 
org.apache.flink.table.runtime.util.BinPacking.pack(BinPacking.java:38)  at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:173)
 at java.util.HashMap.forEach(HashMap.java:1288) at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:169)
  at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:151)
at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:141)
  at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
  at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
   at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) 
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at 
java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: 
File does not exist: 
hdfs://mycluster/user/hive/warehouse/test.db/offer_69/pt_dt=2021-8-10-72/.uncompacted-part-b2108114-b92b-4c37-b204-45f0150236f4-0-3
 at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1583)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1576)
  at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1591)
  at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
at 
org.apache.flink.table.filesystem.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:161)
 ... 19 more

Re: flink hive批量作业报FileNotFoundException

2021-05-28 文章 libowen
hi:
   您好,我想请问一下您这个问题您解决了吗,我们也遇到了这个问题。不过我们的环境是standalone.的。




--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink hive batch作业报FileNotFoundException

2021-05-27 文章 bowen li
Hi,大家好
我执行的是batch table写入hive时,会出现FileNotFound的错误,找不到.staging文件
   版本是 1.12.1 搭建方式是 standalone
   报错信息如下:
   
   11:28
Caused by: java.lang.Exception: Failed to finalize execution on master
  ... 33 more
Caused by: org.apache.flink.table.api.TableException: Exception in 
finalizeGlobal
  at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:94)
  at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148)
  at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1368)
  ... 32 more
Caused by: java.io.FileNotFoundException: File 
hdfs://nameservice1/user/hive/warehouse/flinkdb.db/ods_csc_zcdrpf_test_utf8/.staging_1622171066985
 does not exist.
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:986)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:122)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1046)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1043)
  at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
  at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1053)
  at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170)
  at 
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:137)
  at 
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:93)
  at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:92)
  ... 34 more

flink hive维表关联报错snappy压缩问题

2021-04-12 文章 kandy.wang
 java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z

at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)

at 
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)

at 
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)

at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)

at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)

at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)

at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1886)

at 
org.apache.hadoop.mapred.SequenceFileRecordReader.(SequenceFileRecordReader.java:49)

at 
org.apache.hadoop.mapred.SequenceFileInputFormat.getRecordReader(SequenceFileInputFormat.java:64)

at 
org.apache.flink.connectors.hive.read.HiveMapredSplitReader.(HiveMapredSplitReader.java:113)

at 
org.apache.flink.connectors.hive.read.HiveTableInputFormat.open(HiveTableInputFormat.java:162)

at 
org.apache.flink.connectors.hive.read.HiveInputFormatPartitionReader.open(HiveInputFormatPartitionReader.java:86)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.checkCacheReload(FileSystemLookupFunction.java:128)

at 
org.apache.flink.table.filesystem.FileSystemLookupFunction.eval(FileSystemLookupFunction.java:103)

at LookupFunction$74.flatMap(Unknown Source)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)

at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)

at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

at StreamExecCalc$71.processElement(Unknown Source)
请问snappy的问题怎么解决?

Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
注意时区哦,SQL层默认使用UTC的long值

On Thu, Oct 29, 2020 at 12:12 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
> 但是我后来设置source 产生出watermark 还是不行;
> val dataStream = streamEnv.addSource(new MySource)
>
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
>   .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
> override def extractTimestamp(element: UserInfo, recordTimestamp:
> Long): Long = element.getTs.getTime
>   }))
> 生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
>
>
>
> hdxg1101300...@163.com
>
> 发件人: Jingsong Li
> 发送时间: 2020-10-28 16:29
> 收件人: user-zh
> 主题: Re: flink hive Streaming查询不到数据的问题
> Hi,
>
> 你的Source看起来并没有产出watermark,所以:
>
> 你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
>
> Best,
> Jingsong
>
> On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
> hdxg1101300...@163.com> wrote:
>
> > 你好:
> > 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> > 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> > 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> > 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> > 下面是我的代码
> >  object StreamMain {
> >   def main(args: Array[String]): Unit = {
> > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > streamEnv.setParallelism(3)
> >
> > val tableEnvSettings = EnvironmentSettings.newInstance()
> >   .useBlinkPlanner()
> >   .inStreamingMode()
> >   .build()
> >
> > val tableEnv = StreamTableEnvironment.create(streamEnv,
> tableEnvSettings)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
> >
> >
>  
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
> >
> > val dataStream = streamEnv.addSource(new MySource)
> >
> > val catalogName = "my_catalog"
> > val catalog = new HiveCatalog(
> >   catalogName,  // catalog name
> >   "yutest",// default database
> >
> >   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive
> config (hive-site.xml) directory
> >   "1.1.0"   // Hive version
> > )
> > tableEnv.registerCatalog(catalogName, catalog)
> > tableEnv.useCatalog(catalogName)
> >
> > tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> > tableEnv.useDatabase("yutest")
> >
> >
> > tableEnv.createTemporaryView("users", dataStream)
> > tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> > //  如果hive中已经存在了相应的表,则这段代码省略
> > val hiveSql = """CREATE external TABLE fs_table (
> > user_id STRING,
> > order_amount DOUBLE
> >   )
> >   partitioned by(
> >   dt string,
> >   h string,
> >   m string) stored as parquet
> >   TBLPROPERTIES (
> >
> > 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> > 'sink.partition-commit.delay'='0s',
> > 'sink.partition-commit.trigger'='partition-time',
> >
> >
>  'sink.partition-commit.policy.kind'='metastore,success-file'
> >   )""".stripMargin
> > tableEnv.executeSql(hiveSql)
> >
> >
> > val insertSql = "insert into  fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> > tableEnv.executeSql(insertSql)
> >   }
> > }
> > public class MySource implements SourceFunction {
> > private volatile boolean run = true;
> > String userids[] = {
> >
> > "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
> >
> > "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
> >
> > "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
> >
> > "3ebfb9602ac07779||3ebfe9612a007979

Re: Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 hdxg1101300...@163.com
我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
但是我后来设置source 产生出watermark 还是不行;
val dataStream = streamEnv.addSource(new MySource)
  
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
  .withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
override def extractTimestamp(element: UserInfo, recordTimestamp: 
Long): Long = element.getTs.getTime
  }))
生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的



hdxg1101300...@163.com
 
发件人: Jingsong Li
发送时间: 2020-10-28 16:29
收件人: user-zh
主题: Re: flink hive Streaming查询不到数据的问题
Hi,
 
你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
 
Best,
Jingsong
 
On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:
 
> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
>  

Re: flink hive Streaming查询不到数据的问题

2020-10-28 文章 Jingsong Li
Hi,

你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。

Best,
Jingsong

On Wed, Oct 28, 2020 at 4:13 PM hdxg1101300...@163.com <
hdxg1101300...@163.com> wrote:

> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
>  object StreamMain {
>   def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE)
>
> 
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
>   catalogName,  // catalog name
>   "yutest",// default database
>
>   "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive 
> config (hive-site.xml) directory
>   "1.1.0"   // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
> //  如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
>   )
>   partitioned by(
>   dt string,
>   h string,
>   m string) stored as parquet
>   TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt 
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
> 
> 'sink.partition-commit.policy.kind'='metastore,success-file'
>   )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
> DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
> FROM users"
> tableEnv.executeSql(insertSql)
>   }
> }
> public class MySource implements SourceFunction {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247", 
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979", 
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7", 
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext sourceContext) 
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length - 
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
>  >
>  > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> --
> hdxg1101300...@163.com
>


-- 
Best, Jingsong Lee


flink hive Streaming查询不到数据的问题

2020-10-28 文章 hdxg1101300...@163.com
你好:
我现在在使用flink 1.11.2版本 hive1.1.0 版本;
当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
好像是分区信息没有提交到hive  meta store;但是官网已经说实现了这个功能;我操作却不行
下面是我的代码
 object StreamMain {
  def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20))

val dataStream = streamEnv.addSource(new MySource)

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,  // catalog name
  "yutest",// default database
  "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\",  // Hive config 
(hive-site.xml) directory
  "1.1.0"   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("yutest")


tableEnv.createTemporaryView("users", dataStream)
tableEnv.executeSql("DROP TABLE IF EXISTS  fs_table ")
//  如果hive中已经存在了相应的表,则这段代码省略
val hiveSql = """CREATE external TABLE fs_table (
user_id STRING,
order_amount DOUBLE
  )
  partitioned by(
  dt string,
  h string,
  m string) stored as parquet
  TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
  )""".stripMargin
tableEnv.executeSql(hiveSql)

val insertSql = "insert into  fs_table SELECT userId, amount, " + " 
DATE_FORMAT(ts, '-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') 
FROM users"
tableEnv.executeSql(insertSql)
  }
}
public class MySource implements SourceFunction {
private volatile boolean run = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247", 
"67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", 
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4", 
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979", 
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7", 
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};

@Override
public void run(SourceFunction.SourceContext sourceContext) 
throws Exception {

while (run) {
String userid = userids[(int) (Math.random() * (userids.length - 
1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}

@Override
public void cancel() {
run = false;
}
}
public class UserInfo implements Serializable {
private String userId;
private Double amount;
private Timestamp ts;

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public Double getAmount() {
return amount;
}

public void setAmount(Double amount) {
this.amount = amount;
}

public Timestamp getTs() {
return ts;
}

public void setTs(Timestamp ts) {
this.ts = ts;
}
}

hive (yutest)>
 >
 > show partitions fs_table;
OK
partition
Time taken: 20.214 seconds



hdxg1101300...@163.com


Re: flink hive批量作业报FileNotFoundException

2020-09-18 文章 Rui Li
Hello,

作业的逻辑大概是啥样的,我去试试

On Thu, Sep 17, 2020 at 10:00 PM godfrey he  wrote:

> cc @Rui Li 
>
> 李佳宸  于2020年9月14日周一 下午5:11写道:
>
>> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
>> 版本是1.11.1
>> Caused by: java.io.FileNotFoundException: File
>>
>> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
>> does not exist.
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
>> ~[hadoop-client-api-3.1.3.jar:?]
>> at
>>
>> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
>> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
>> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>> at
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
>> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>>
>> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>>
>

-- 
Best regards!
Rui Li


Re: flink hive批量作业报FileNotFoundException

2020-09-17 文章 godfrey he
cc @Rui Li 

李佳宸  于2020年9月14日周一 下午5:11写道:

> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does not exist.
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
> ~[hadoop-client-api-3.1.3.jar:?]
> at
>
> org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
> ~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
> at
>
> org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
> ~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> 在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题
>


flink hive批量作业报FileNotFoundException

2020-09-14 文章 李佳宸
大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
版本是1.11.1
Caused by: java.io.FileNotFoundException: File
hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
does not exist.
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1053)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1113)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1110)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1120)
~[hadoop-client-api-3.1.3.jar:?]
at
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:157)
~[flink-sql-connector-hive-3.1.2_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:95)
~[flink-table-blink_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:132)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1286)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

在standalone模式下没有这个问题,on yarn 的per job模式下部分job就会出现这个问题


Flink ????hive?? ?????? Savepoint

2020-05-06 文章 ??????
??flink 1.10 
hive??SavepointTableDataSet

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
Table table = tableEnv.sqlQuery("select * from test001");

BootstrapTransformation

?????? Flink??????????Hive??????????????????

2019-12-09 文章 hiliuxg
https://github.com/hiliuxg/flink-orc-sink hdfs orcexectly 
once




----
??: "jingwen jingwen"https://issues.apache.org/jira/browse/FLINK-14249
 
  Best,
  Jingsong Lee
 
 
  --
  From: 

Re: [PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs

2019-03-20 文章 Bowen Li
Thanks, Shaoxuan! I've sent a Chinese version to user-zh at the same time
yesterday.

>From feedbacks we received so far, supporting multiple older hive versions
is definitely one of our focuses next.

*More feedbacks are welcome from our community!*


On Tue, Mar 19, 2019 at 8:44 PM Shaoxuan Wang  wrote:

> Hi Bowen,
> Thanks for driving this. I am CCing this email/survey to user-zh@
> flink.apache.org as well.
> I heard there are lots of interests on Flink-Hive from the field. One of
> the biggest requests the hive users are raised is "the support of
> out-of-date hive version". A large amount of users are still working on the
> cluster with CDH/HDP installed with old hive version, say 1.2.1/2.1.1. We
> need ensure the support of these Hive version when planning the work on
> Flink-Hive integration.
>
> *@all. "We want to get your feedbacks on Flink-Hive integration." *
>
> Regards,
> Shaoxuan
>
> On Wed, Mar 20, 2019 at 7:16 AM Bowen Li  wrote:
>
>> Hi Flink users and devs,
>>
>> We want to get your feedbacks on integrating Flink with Hive.
>>
>> Background: In Flink Forward in Beijing last December, the community
>> announced to initiate efforts on integrating Flink and Hive. On Feb 21 
>> Seattle
>> Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>,
>> We presented Integrating Flink with Hive
>> <https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019>
>>  with
>> a live demo to local community and got great response. As of mid March now,
>> we have internally finished building Flink's brand-new catalog
>> infrastructure, metadata integration with Hive, and most common cases of
>> Flink reading/writing against Hive, and will start to submit more design
>> docs/FLIP and contribute code back to community. The reason for doing it
>> internally first and then in community is to ensure our proposed solutions
>> are fully validated and tested, gain hands-on experience and not miss
>> anything in design. You are very welcome to join this effort, from
>> design/code review, to development and testing.
>>
>> *The most important thing we believe you, our Flink users/devs, can help
>> RIGHT NOW is to share your Hive use cases and give us feedbacks for this
>> project. As we start to go deeper on specific areas of integration, you
>> feedbacks and suggestions will help us to refine our backlogs and
>> prioritize our work, and you can get the features you want sooner! *Just
>> for example, if most users is mainly only reading Hive data, then we can
>> prioritize tuning read performance over implementing write capability.
>> A quick review of what we've finished building internally and is ready to
>> contribute back to community:
>>
>>- Flink/Hive Metadata Integration
>>   - Unified, pluggable catalog infra that manages meta-objects,
>>   including catalogs, databases, tables, views, functions, partitions,
>>   table/partition stats
>>   - Three catalog impls - A in-memory catalog, HiveCatalog for
>>   embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting
>>   Flink's streaming/batch metadata in Hive metastore
>>   - Hierarchical metadata reference as
>>   .. in SQL and Table API
>>   - Unified function catalog based on new catalog infra, also
>>   support Hive simple UDF
>>- Flink/Hive Data Integration
>>   - Hive data connector that reads partitioned/non-partitioned Hive
>>   tables, and supports partition pruning, both Hive simple and complex 
>> data
>>   types, and basic write
>>- More powerful SQL Client fully integrated with the above features
>>and more Hive-compatible SQL syntax for better end-to-end SQL experience
>>
>> *Given above info, we want to learn from you on: How do you use Hive
>> currently? How can we solve your pain points? What features do you expect
>> from Flink-Hive integration? Those can be details like:*
>>
>>- *Which Hive version are you using? Do you plan to upgrade Hive?*
>>- *Are you planning to switch Hive engine? What timeline are you
>>looking at? Until what capabilities Flink has will you consider using 
>> Flink
>>with Hive?*
>>- *What's your motivation to try Flink-Hive? Maintain only one data
>>processing system across your teams for simplicity and maintainability?
>>Better performance of Flink over Hive itself?*
>>- *What are your Hive use cases? How large is your Hive data size? Do
>>you mainly do reading, or 

Re: [PROGRESS UPDATE] [DISCUSS] Flink-Hive Integration and Catalogs

2019-03-19 文章 Shaoxuan Wang
Hi Bowen,
Thanks for driving this. I am CCing this email/survey to user-zh@
flink.apache.org as well.
I heard there are lots of interests on Flink-Hive from the field. One of
the biggest requests the hive users are raised is "the support of
out-of-date hive version". A large amount of users are still working on the
cluster with CDH/HDP installed with old hive version, say 1.2.1/2.1.1. We
need ensure the support of these Hive version when planning the work on
Flink-Hive integration.

*@all. "We want to get your feedbacks on Flink-Hive integration." *

Regards,
Shaoxuan

On Wed, Mar 20, 2019 at 7:16 AM Bowen Li  wrote:

> Hi Flink users and devs,
>
> We want to get your feedbacks on integrating Flink with Hive.
>
> Background: In Flink Forward in Beijing last December, the community
> announced to initiate efforts on integrating Flink and Hive. On Feb 21 Seattle
> Flink Meetup <https://www.meetup.com/seattle-flink/events/258723322/>, We
> presented Integrating Flink with Hive
> <https://www.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019>
>  with
> a live demo to local community and got great response. As of mid March now,
> we have internally finished building Flink's brand-new catalog
> infrastructure, metadata integration with Hive, and most common cases of
> Flink reading/writing against Hive, and will start to submit more design
> docs/FLIP and contribute code back to community. The reason for doing it
> internally first and then in community is to ensure our proposed solutions
> are fully validated and tested, gain hands-on experience and not miss
> anything in design. You are very welcome to join this effort, from
> design/code review, to development and testing.
>
> *The most important thing we believe you, our Flink users/devs, can help
> RIGHT NOW is to share your Hive use cases and give us feedbacks for this
> project. As we start to go deeper on specific areas of integration, you
> feedbacks and suggestions will help us to refine our backlogs and
> prioritize our work, and you can get the features you want sooner! *Just
> for example, if most users is mainly only reading Hive data, then we can
> prioritize tuning read performance over implementing write capability.
> A quick review of what we've finished building internally and is ready to
> contribute back to community:
>
>- Flink/Hive Metadata Integration
>   - Unified, pluggable catalog infra that manages meta-objects,
>   including catalogs, databases, tables, views, functions, partitions,
>   table/partition stats
>   - Three catalog impls - A in-memory catalog, HiveCatalog for
>   embracing Hive ecosystem, GenericHiveMetastoreCatalog for persisting
>   Flink's streaming/batch metadata in Hive metastore
>   - Hierarchical metadata reference as
>   .. in SQL and Table API
>   - Unified function catalog based on new catalog infra, also support
>   Hive simple UDF
>- Flink/Hive Data Integration
>   - Hive data connector that reads partitioned/non-partitioned Hive
>   tables, and supports partition pruning, both Hive simple and complex 
> data
>   types, and basic write
>- More powerful SQL Client fully integrated with the above features
>and more Hive-compatible SQL syntax for better end-to-end SQL experience
>
> *Given above info, we want to learn from you on: How do you use Hive
> currently? How can we solve your pain points? What features do you expect
> from Flink-Hive integration? Those can be details like:*
>
>- *Which Hive version are you using? Do you plan to upgrade Hive?*
>- *Are you planning to switch Hive engine? What timeline are you
>looking at? Until what capabilities Flink has will you consider using Flink
>with Hive?*
>- *What's your motivation to try Flink-Hive? Maintain only one data
>processing system across your teams for simplicity and maintainability?
>Better performance of Flink over Hive itself?*
>- *What are your Hive use cases? How large is your Hive data size? Do
>you mainly do reading, or both reading and writing?*
>- *How many Hive user defined functions do you have? Are they mostly
>UDF, GenericUDF, or UDTF, or UDAF?*
>- any questions or suggestions you have? or as simple as how you feel
>about the project
>
> Again, your input will be really valuable to us, and we hope, with all of
> us working together, the project can benefits our end users. Please feel
> free to either reply to this thread or just to me. I'm also working on
> creating a questionnaire to better gather your feedbacks, watch for the
> maillist in the next couple days.
>
> Thanks,
> Bowen
>
>
>
>
>