flink sql 消费kafka 消息 写入Hive不提交分区

2021-01-14 文章 花乞丐
我这边从kafka消费信息,然后写入到Hive中,目前发现不提交分区,不提交分区的原因是watemark是负数的,不清楚这个负数的watermark是怎么出现的? 我代码也指定了watermark,但是debug的时候好像没有起作用 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re:Re:flinksql 消费kafka offset问题

2021-01-14 文章 Michael Ran
额,不用checkpoint 会比较麻烦。 以前自定义sink 的时候,会把消息信息到sink 位置进行提交。 上游source 也得改造,拉取位置也得统一,比如走redis 数据库等等 在 2021-01-15 12:41:25,"air23" 写道: >我的意思 是不使用checkpoint。 >使用'scan.startup.mode' = 'group-offsets' 去维护offset > > > > > > > > > > > > > > > > > > > > > >在 2021-01-15 11:35:16,"Michael Ran" 写道: >>下游sink还没有完

flink sql hop????????????????????

2021-01-14 文章 bigdata
??         flink1.10.1,??=-?? SELECT |DATE_FORMAT(TIMESTAMPADD(HOUR, 8, HOP_START(proctime, INTERVAL '$slide' SECOND, INTERVAL '$size' MI

flink sql hop????????????????????

2021-01-14 文章 bigdata
??         flink1.10.1,??=-??

Re:Re:flinksql 消费kafka offset问题

2021-01-14 文章 air23
我的意思 是不使用checkpoint。 使用'scan.startup.mode' = 'group-offsets' 去维护offset 在 2021-01-15 11:35:16,"Michael Ran" 写道: >下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink >是异步操作,告诉上游你sink 完成了,实际你sink失败了 >在 2021-01-15 10:29:15,"air23" 写道: >>flink消费kafka 只能使用checkpoint去维护

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
Hi 1. 目前没有全局的配置 2. 开启cleanFullSnapshot 并不会物理清除数据,只是确保checkpoint数据中没有相关过期数据 祝好 唐云 From: 孙啸龙 Sent: Thursday, January 14, 2021 20:43 To: user-zh@flink.apache.org Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化 你好: 非常谢谢, 本地的数据是过期了。 不好意思,还有几个疑问想请教下。 1.看文

Re:flinksql 消费kafka offset问题

2021-01-14 文章 Michael Ran
下游sink还没有完成, offset 不是在checkpoint 里面的吗?下次启动会从你ck的位置恢复才对。除非你sink 是异步操作,告诉上游你sink 完成了,实际你sink失败了 在 2021-01-15 10:29:15,"air23" 写道: >flink消费kafka 只能使用checkpoint去维护offset吗 > >我这边使用'scan.startup.mode' = 'group-offsets' > >如果中间报错了 或者停止任务,但是我下游sink还没有完成, >下次启动直接跳过这个报错的数据,会丢数据,谢谢回复

回复: Re: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。 yinghua...@163.com 发件人: Yun Tang 发送时间: 2021-01-15 11:02 收件人: user-zh 主题: Re: 回复: 请教个Flink checkpoint的问题 Hi 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认

Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Yun Tang
Hi 这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain checkpoint的数量为1而被subsume掉了,也就是被删掉了。 如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。 另外说一句,即使是已经deprecated的cancel with savepoint的用法,当新的savepoint创建成功后,旧的chec

flinksql 消费kafka offset问题

2021-01-14 文章 air23
flink消费kafka 只能使用checkpoint去维护offset吗 我这边使用'scan.startup.mode' = 'group-offsets' 如果中间报错了 或者停止任务,但是我下游sink还没有完成, 下次启动直接跳过这个报错的数据,会丢数据,谢谢回复

Re: Re:flink-sql字段类型问题

2021-01-14 文章 zhang hao
看了下源码BigInteger 转都会有问题,没有匹配的这种类型: public boolean isNullAt(int pos) { return this.fields[pos] == null; } @Override public boolean getBoolean(int pos) { return (boolean) this.fields[pos]; } @Override public byte getByte(int pos) { return (byte) this.fields[pos]; } @Override public short

Re: flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 Evan
我的也是flink 1.11.0版本的,也是使用的stmtSet.execute()方式,是可以正常运行的,你可以debug检查一下你要执行的SQL语句 发件人: datayangl 发送时间: 2021-01-14 16:13 收件人: user-zh 主题: flink1.11使用createStatementSet报错 No operators defined in streaming topology flink版本: 1.11 使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args:

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 孙啸龙
你好: 非常谢谢, 本地的数据是过期了。 不好意思,还有几个疑问想请教下。 1.看文档,开启cleanFullSnapshot是只能对单个状态设置吗,没查到flink sql 开启cleanFullSnapshot的配置的地方?因为只看到StateTtlConfig是对于单个状态的设置,没有对job或者对table的config设置。 2.cleanFullSnapshot 开启后,从checkpoint恢复才会触发清理,不是在checkpoint过程中触发清理掉过期数据? > 在 2021年1月14日,下午4:48,Yun Tang 写道: > >

回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
好的,感谢您的回复! yinghua...@163.com 发件人: Evan 发送时间: 2021-01-14 18:48 收件人: user-zh 主题: 回复: 回复: 请教个Flink checkpoint的问题 是的,应该是机制问题,链接[1]打开有这样一句解释: If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (termin

Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 赵一旦
Evan说的这个是一个设置,但也仅影响cancel那个命令,stop还是会删除。这个点其实做的不是很好,不清楚为啥,之前Q过,没人鸟。。。 所以按照我的经验,如果是需要停止并基于保存点重启,那还好。如果计划基于检查点重启,无比提前备份检查点,然后停任务,然后复制备份回去。 在或者,直接cancel,不用stop。 Evan 于2021年1月14日周四 下午6:49写道: > 是的,应该是机制问题,链接[1]打开有这样一句解释: > > If you choose to retain externalized checkpoints on cancellation you have >

回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
是的,应该是机制问题,链接[1]打开有这样一句解释: If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED). [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dep

Re: Re: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
感谢您的答复! yinghua...@163.com 发件人: 赵一旦 发送时间: 2021-01-14 18:43 收件人: user-zh 主题: Re: Re: 请教个Flink checkpoint的问题 机制就是这样的。如下是我之前做过的测试。 启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明 WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。 通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK 通过命令取消任务:flink c

Re: Re: 请教个Flink checkpoint的问题

2021-01-14 文章 赵一旦
机制就是这样的。如下是我之前做过的测试。 启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明 WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。 通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK 通过命令取消任务:flink cancel ${jobId} 保留 OK 通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK 通过命令停止任务(基于默认保存点目录):flink sto

Re:Re: 请教个Flink checkpoint的问题

2021-01-14 文章 邮件帮助中心
刚才又操作了一次,我重新截图了放在附件里了, 开始在18:29:29时没有看到chk-8生成,就是在18:29:29时checkpoint没有生成, 然后18:29:34查看时,checkpoint生成了 然后18:29:51查看时,checkpoint还在,此时我停止了那个任务 18:30:11去查看时,checkpoint的chk-8不见了 在 2021-01-14 18:04:27,"tison" 写道: >没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。 > >Best,

Re: Re:flink-sql字段类型问题

2021-01-14 文章 yinghua...@163.com
回复错了,抱歉! yinghua...@163.com 发件人: yinghua...@163.com 发送时间: 2021-01-14 18:16 收件人: user-zh 主题: Re: 转发:flink-sql字段类型问题 [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6 Java HotSpot(TM) 64-Bit Server VM

Re: Re: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0 log4j:WARN No such property [datePattern] in org.

Re: 转发:flink-sql字段类型问题

2021-01-14 文章 yinghua...@163.com
[root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6 Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0 log4j:WARN No such property [datePattern] in org

Re: 请教个Flink checkpoint的问题

2021-01-14 文章 tison
没明白你说的最近一次 checkpoint 被删除啥意思,你可以列一下 checkpoint 目录的内容,你觉得应该是啥,结果是啥。 Best, tison. Evan 于2021年1月14日周四 下午5:56写道: > 代码图挂掉了,看不到代码 > > > > > 发件人: yinghua...@163.com > 发送时间: 2021-01-14 17:26 > 收件人: user-zh > 主题: 请教个Flink checkpoint的问题 > > 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateB

回复: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
代码如下: streamEnv.enableCheckpointing(5 * 60 * 1000); CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig(); checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkPointConfig.setCheckpointTimeout(1 * 60 * 1000); checkPointConfig.setMinPauseBetwe

回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Evan
代码图挂掉了,看不到代码 发件人: yinghua...@163.com 发送时间: 2021-01-14 17:26 收件人: user-zh 主题: 请教个Flink checkpoint的问题 我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelW

回复: 请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
刚才代码截图没发出去,再贴下代码 streamEnv.enableCheckpointing(5 * 60 * 1000); CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig(); checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkPointConfig.setCheckpointTimeout(1 * 60 * 1000); checkPointConfig.setMinPauseBetweenCheckpo

请教个Flink checkpoint的问题

2021-01-14 文章 yinghua...@163.com
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下 现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息? yinghua...@163.com

回复:flink1.12 k8s session部署,TM无法启动

2021-01-14 文章 superainbower
大佬,可否提供一下你那边flink native 方式 k8s部署的测试文档地址 在2021年01月14日 15:12,Yang Wang 写道: 这个问题是在1.12.1中修复的,1.12.0里面还不能支持给TM设置ServiceAccount 具体可以看下这个ticket,https://issues.apache.org/jira/browse/FLINK-20664 另外,1.12.1正在投票,最近就会发布 Best, Yang 1120344670 <1120344...@qq.com> 于2021年1月13日周三 下午5:17写道: > *flink版本: 1.12*

转发:flink-sql字段类型问题

2021-01-14 文章 郝文强
| | 郝文强 | | 18846086...@163.com | 签名由网易邮箱大师定制 - 转发邮件信息 - 发件人: 郝文强 <18846086...@163.com> 发送日期: 2021年01月14日 17:23 发送至: d...@flink.apache.org 主题: 转发:flink-sql字段类型问题 | | 郝文强 | | 18846086...@163.com | 签名由网易邮箱大师定制 - 转发邮件信息 - 发件人: 郝文强 <18846086...@163.com>

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
Hi, 你本地的数据肯定是过期了,checkpoint size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。 其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。 建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。 [1]

flink1.11使用createStatementSet报错 No operators defined in streaming topology

2021-01-14 文章 datayangl
flink版本: 1.11 使用createStatementSet 在一个人任务中完成多个数据从hive写入不同的kafka 代码如下: def main(args: Array[String]): Unit = { FlinkUtils.initTable() val tableEnv: StreamTableEnvironment = FlinkUtils.tableEnv val streamEnv: StreamExecutionEnvironment = FlinkUtils.streamEnv streamEnv.disableOperato

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 孙啸龙
你好: 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。 > 在 2021年1月14日,下午4:07,Yun Tang 写道: > > 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)? > > 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据? > > > 祝好 > 唐云 > > From: 孙啸龙 > Sent: Th

回复: flink sql读kafka元数据问题

2021-01-14 文章 酷酷的浑蛋
官网没说在哪里读key啊 在2021年01月14日 14:52,Jark Wu 写道: kafka 读 key fields: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote: hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes Jaso

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)? 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据? 祝好 唐云 From: 孙啸龙 Sent: Thursday, January 14, 2021 15:52 To: user-zh@flink.apache.org Subject: Flink sql 状态过期后,checkpoint 大小没变化 大家好:

回复: flink sql读kafka元数据问题

2021-01-14 文章 酷酷的浑蛋
在2021年01月14日 16:03,酷酷的浑蛋 写道: 你意思是说,topic不是flink写入的,用flink sql就不能读到key? 在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道: hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复: flink sql读kafka元数据问题

2021-01-14 文章 酷酷的浑蛋
你意思是说,topic不是flink写入的,用flink sql就不能读到key? 在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道: hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/