答复: Flink Datastream实现删除操作

2024-06-04 文章 Xiqian YU
您好, Iceberg 为 Flink 实现的 connector 同时支持 DataStream API 和 Table API[1]。其 DataStream API 提供 Append(默认行为)、Overwrite、Upsert 三种可选的模式,您可以使用下面的 Java 代码片段实现: 首先创建对应数据行 Schema 格式的反序列化器,例如,可以使用 RowDataDebeziumDeserializeSchema 的生成器来快速构造一个: private RowDataDebeziumDeserializeSchema getDeserializer(

答复: 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路”

2023-08-26 文章 北野 �悦
资料显示,渐冻症一般指肌萎缩侧索硬化,它是上运动神经元和下运动神经元损伤之后,导致包括球部(指延髓支配的这部分肌肉)、四肢、躯干、胸部腹部的肌肉逐渐无力和萎缩。 发件人: tison 发送时间: 2023年8月26日 10:15 收件人: user-zh@flink.apache.org ; priv...@flink.apache.org 主题: Re: 朝鲜第二次发射军事侦察卫星,对话是解决问题的“唯一出路” I suggest we should ban this spamming source.. Best,

答复: Flink1.17.1 yarn token 过期问题

2023-07-19 文章 王刚
异常栈信息 ``` 2023-07-20 11:43:01,627 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Terminating TaskManagerRunner with exit code 1. org.apache.flink.util.FlinkException: Failed to start the TaskManagerRunner. at org.apache.flink.runtime.taskexecutor.TaskManagerRunn

答复:

2023-02-23 文章 704669594
退订

答复: Re:Re:yarn api 提交报错

2022-04-08 文章 Geng Biao
Hi 周涛, Mang的建议很好,封装拼接参数是比较常见的实现。如果你没有什么特殊需求的话,推荐考虑他的建议,之后Flink版本升级之类的也一般会方便一些。 如果你因为某些原因,要继续走你目前的方式的话,我看到你的代码和YARNApplicationITCase中的代码比较接近了,你可以注意下,ITCase中的代码是在本地运行的,并且通过类似YARNApplicationITCase#startYARNWithConfig这样的方法设置好了HADOOP和Flink相关的环境变量。 在实际作业中,最终在Server侧,YARN在AM运行Flink作业的命令类似这样: /bin/bash -c

Re:答复: 回复:如何从复杂的kafka消息体定义 table

2022-02-28 文章 mack143
退订 在 2021-07-09 10:06:19,"Chenzhiyuan(HR)" 写道: >消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: > >CREATE TABLE MyUserTable( >APPLY_PERSON_ID VARCHAR, >UPDATE_SALARY DECIMAL, >UP_AMOUNT DECIMAL, >CURRENCY VARCHAR, >EXCHANGE_RATE DECIMAL >) with ( >'connector.type' = 'kafka', >'connector

答复: 退订

2021-07-30 文章 zhao liang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,是发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 发件人: 谢振爵 日期: 星期五, 2021年7月30日 15:11 收件人: user-zh 主题: 退订 退订

答复: 退订

2021-07-30 文章 zhao liang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,是发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 发件人: 赵珠峰 日期: 星期五, 2021年7月30日 15:15 收件人: user-zh@flink.apache.org 主题: 退订 退订 本邮件载有秘密信息,请您恪守保密义务。未经许可不得披露、使用或允许他人使用。谢谢合作。 This email contains confidential information. Recipient is obliged to keep the informa

答复: 退订

2021-07-29 文章 zhao liang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,是发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 发件人: 闫健儒 日期: 星期四, 2021年7月29日 16:57 收件人: user-zh@flink.apache.org 主题: 退订 退订

答复: 退订

2021-07-29 文章 zhao liang
如果需要取消订阅 user-zh@flink.apache.org 邮件组,发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 就可以了 发件人: zhangjunjie1130 日期: 星期四, 2021年7月29日 16:38 收件人: user-zh@flink.apache.org 主题: 退订 退订 | | zhangjunj | | 邮箱:zhangjunjie1.

答复: 回复: 如何从复杂的kafka消息体定义 table

2021-07-08 文章 Chenzhiyuan(HR)
列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。 -邮件原件- 发件人: JasonLee [mailto:17610775...@163.com] 发送时间: 2021年7月9日 11:34 收件人: user-zh@flink.apache.org 主题: 回复: 如何从复杂的kafka消息体定义 table Hi 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了. Best JasonLee 在2021年07月9日 10:06,Chenzhiyuan(H

答复: 回复:如何从复杂的kafka消息体定义 table

2021-07-08 文章 Chenzhiyuan(HR)
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: CREATE TABLE MyUserTable( APPLY_PERSON_ID VARCHAR, UPDATE_SALARY DECIMAL, UP_AMOUNT DECIMAL, CURRENCY VARCHAR, EXCHANGE_RATE DECIMAL ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'conn

答复: Upsert Kafka 的format 为什么要求是INSERT-ONLY的

2021-03-14 文章 刘首维
Hi Shengkai, 感谢回复 让我理解一下: 在ChangelogNormalize中 1. Rowkind是未生效的 2. null表达墓碑 3. 保存全量数据的overhead 如果我的理解是对的,那么假设遇到了不论是+u i,还是-u,d 都会被理解为是一次insert,从而促使下游emit record? 我们现在有若干自定义的Format,如果为了适配Upsert Kafka,format需要对d,(-u) 事件发射value == null的Record吗 ___

答复: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

2021-02-09 文章 范超
可能是你没有在open方法里从上下文获取mapstate e.g.: map = getRuntimeContext().getMapState(new MapStateDescriptor(mapname, xx.class, xx.class))吗 -邮件原件- 发件人: 谌祖安 [mailto:shenz...@pcitech.com] 发送时间: 2021年2月7日 星期日 16:25 收件人: user-zh 主题: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询 您好! 重载procesElement方法

Re:答复: Re:Flink Jdbc sink写入多表如何实现

2021-01-19 文章 Michael Ran
通过key 分成不同 view ,然后不同的view 写不同的表可以吧 在 2021-01-19 17:59:54,"范超" 写道: >我这边自己的做法是,根据不同的行为时间,将source拆流,然后一个事件对应一个sink >Sink的逻辑处理都是一致的,只不过配置的表不同。 > >-邮件原件- >发件人: hailongwang [mailto:18868816...@163.com] >发送时间: 2020年11月3日 星期二 0:21 >收件人: user-zh@flink.apache.org >主题: Re:Flink Jdbc sink写入多表如何实现 >

答复: Flink程序连接Kafka类型不匹配问题

2021-01-19 文章 范超
这是函数式接口。。应该用lambda替代就行了 发件人: Natasha [mailto:13631230...@163.com] 发送时间: 2020年10月30日 星期五 10:19 收件人: user-zh@flink.apache.org 主题: Flink程序连接Kafka类型不匹配问题 Hi,社区~ 我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题,这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激。 [cid:image001.png@01D6EE8E.36285D30] B

答复: Re:Flink Jdbc sink写入多表如何实现

2021-01-19 文章 范超
我这边自己的做法是,根据不同的行为时间,将source拆流,然后一个事件对应一个sink Sink的逻辑处理都是一致的,只不过配置的表不同。 -邮件原件- 发件人: hailongwang [mailto:18868816...@163.com] 发送时间: 2020年11月3日 星期二 0:21 收件人: user-zh@flink.apache.org 主题: Re:Flink Jdbc sink写入多表如何实现 Hi, 目前JDBC sink 不支持分库分表,只能自己实现一个 Sink。具体实现的话,即使 insert Statement 需要在 wri

答复: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 afweijian
通过yarn label可以实现 -邮件原件- 发件人: user-zh-return-10095-afweijian=163@flink.apache.org 代表 yujianbo 发送时间: 2020年12月21日 16:44 收件人: user-zh@flink.apache.org 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点 各位大佬好: 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? -- Sent from: http://apache

Re:答复: flink 1.12如何使用RateLimiter

2020-12-10 文章 hailongwang
如果使用 FlinkKafkaConsumer010 的话,可以调用 FlinkKafkaConsumer010#setRateLimiter(new GuavaFlinkConnectorRateLimiter().setRate) https://github.com/apache/flink/blob/fe3613574f76201a8d55d572a639a4ce7e18a9db/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafk

答复: flink 1.12如何使用RateLimiter

2020-12-10 文章 zhao liang
我这里有个场景是这样的,kafka里已经有一段时间的数据,读取的时候会一股脑的都读进来,我想模拟数据是刚进来的状态,比如数据是5秒一条,我就5秒钟读1条 发件人: Danny Chan 日期: 星期四, 2020年12月10日 10:44 收件人: user-zh@flink.apache.org 主题: Re: flink 1.12如何使用RateLimiter 您好 请问是什么场景呢 ?限速的目的是什么 ? 18757141558 <18757141...@163.com> 于2020年12月9日周三 下午6:49写道: > 在源码中找到 FlinkConnectorRateL

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 bradyMk
这面还想多请教一下: 我程序中每来一条数据都会去读MapState然后覆盖写入新的时间戳,刚刚发现某一条数据读出了两条一样的时间戳,我推断是第一个线程读出来后还没等覆盖掉,第二个线程又读了一遍,导致出现两条一样的时间戳; 所以想请问flink中MapState是线程安全的吗? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 bradyMk
好的~谢谢大佬解答~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 Yun Tang
#rocksdb-native-metrics 祝好 唐云 From: bradyMk Sent: Monday, December 7, 2020 17:05 To: user-zh@flink.apache.org Subject: Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存 Hi~ 可是我这边write buffer以及block cache等参数设置的都不大,都远远小于我分给tm的内存,可为什么还会报超出内存的错误呢? - Best

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 bradyMk
Hi~ 可是我这边write buffer以及block cache等参数设置的都不大,都远远小于我分给tm的内存,可为什么还会报超出内存的错误呢? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 Yun Tang
RocksDB只是将数据可以存储在磁盘上,Flink再周期性将磁盘上数据上传到HDFS,内存中还是有LSM的write buffer以及block cache,也还是需要使用内存的 建议升级Flink版本到1.10+,引入了managed memory功能,理论上对于内存控制是要好很多的。 祝好 唐云 From: bradyMk Sent: Monday, December 7, 2020 11:27 To: user-zh@flink.apache.org Subject: Re: 答复: flink使用

Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-06 文章 bradyMk
hi~谢谢解答; 但我的状态用的是RocksDB,实质上不应该是存的磁盘么?为什么会一直占用tm的内存呢? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/

答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-06 文章 范超
中间状态用完了最好是clear掉。不然一直会占用tm的内存的呢 -邮件原件- 发件人: bradyMk [mailto:zhbm...@126.com] 发送时间: 2020年12月5日 星期六 17:29 收件人: user-zh@flink.apache.org 主题: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存 大家好~ 最近刚刚尝试使用flink 1.9.1 的RocksDB做增量checkpoints; 在程序种设置: val backend = new RocksDBStateBackend("hd

答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-22 文章 sherlock zw
多谢指点,试了下,返回 Tuple 类型作为 key 是可以按多个字段进行分组的,拼接成 String 的话应该也是可以的 final SingleOutputStreamOperator> sum = flatMap .keyBy(new KeySelector, Tuple2>() { @Override public Tuple2 getKey(Tuple3 tuple3) throws Exception { return

答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 sherlock zw
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢? -邮件原件- 发件人: guanxianchun 发送时间: 2020年11月19日 20:53 收件人: user-zh@flink.apache.org 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题 flink-1.11使用Ke

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-22 文章 刘首维
Hi, 我的做法如我所说,是用反射将parser拿出来的,比较hack但是很简单而且很稳妥 代码差不多就是下面这个样子 Flink version: custom version base on 1.11.x @PostConstruct private void setup() throws NoSuchFieldException, IllegalAccessException { final StreamTableEnvironmentImpl env = (StreamTableEnvironmentImpl) support.getStreamT

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-19 文章 刘首维
Hi, 我之前跟你有相同的需求,实现方式也跟你的思路基本类似, mock一个env 然后反射获取calciteParserSupplier 目前在生产环境运行良好 FYI 发件人: 马阳阳 发送时间: 2020年10月19日 17:57:47 收件人: Flink中文邮件列表 主题: Flink 1.11里如何parse出未解析的执行计划 Flink 1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致

答复: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-10-12 文章 范超
要不换个kafka的topic sink测试一下。。我觉得可能是kafka那头的问题,新手只能这样子猜一下。。 -邮件原件- 发件人: Yang Peng [mailto:yangpengklf...@gmail.com] 发送时间: 2020年9月30日 星期三 18:00 收件人: user-zh 主题: Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒 感谢回复,这个任务重启了之后看不到这个in/out指标数据, 我们能查到这个任务依赖的redis的连接查询次数也降低了,好像是任务假死一样 一直在消费数据但是就是不处理

答复: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-26 文章 范超
多谢一旦哥,我大概摸清楚了这几个关系, 目前使用kafka-consumer-perf-test.sh对单broker单分区的kafka压测在8核16G的情况下是30W/s每秒的消费能力。 SINK端的写入大概是20W/s ON YARN的Per JOB模式下通过调整分区和并行度来进行扩容。 接下来的方向按照两位大佬所说应该是对应用的jvm调优来处理了。 感谢一旦哥和benchao哥 -邮件原件- 发件人: 赵一旦 [mailto:hinobl...@gmail.com] 发送时间: 2020年9月25日 星期五 14:57 收件人: user-zh@flink.ap

答复: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-24 文章 范超
感谢benchao哥这么快就回复了。我这边再多观察测试一下。 再次感谢 -邮件原件- 发件人: Benchao Li [mailto:libenc...@apache.org] 发送时间: 2020年9月24日 星期四 16:06 收件人: user-zh 主题: Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以 我们一般提升作业吞吐能力的步骤就是看作业的反压情况, - 如果作业完全没有反压,说明此时处理能力大于上游数据产生速度 - 如果作业有反压,就具体看下反压的是哪个算子,存在什么

答复: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-24 文章 范超
磊哥,我想再多问一个问题。 若topic只有一个分区的情况下。。 我这边压了一下,网卡流量大概是30Mbit/s,不知道如何提高这个消费速度才好,压测程序是个很简单的source,并丢弃的处理。 -邮件原件- 发件人: 范超 发送时间: 2020年9月24日 星期四 10:49 收件人: user-zh@flink.apache.org 主题: 答复: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以 感谢磊哥,后来发现确实是这个问题导致。 Source节点的并行度取决于topic

答复: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-23 文章 范超
感谢磊哥,后来发现确实是这个问题导致。 Source节点的并行度取决于topic的分区数 -邮件原件- 发件人: 吴磊 [mailto:wuleifl...@foxmail.com] 发送时间: 2020年9月18日 星期五 16:29 收件人: user-zh 主题: 回复:FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以 hello,Source节点并行度的有效性是取决于topic对应的分区数的。比如如果你只有6个分区,那你12个并行度和6个并行度的消费速度是一样的。

答复: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-23 文章 范超
谢谢Benchao哥回复。 这几天一直忙着压测这个问题。 经多轮压测(先灌满kafka数据),再去消费。 发现确实是您说的问题中的第三个情况 由于kafka的topic只开了一个partition 所以flinkkafkaconsumer按照一个taskmanger对应了一个kafka的parition的方式进行了处理。从而导致虽然作业并发度够大,但是由于只有一个partition, 其他并发的taskmanager无法获取到更多的partition进行消费,从而导致并行度提升而作业消费能力却无法同比增大。 之后通过建立2个partition的topic,实现了消费能力的翻倍。 想再

答复: Flink-1.11.1 Kafka Table API BigInt 问题

2020-09-22 文章 刘首维
Hi, 试一下java的BigInteger呢 发件人: nashcen <2415370...@qq.com> 发送时间: 2020年9月22日 16:29:41 收件人: user-zh@flink.apache.org 主题: Flink-1.11.1 Kafka Table API BigInt 问题 *我的代码如下* 其中 updateTime 字段,是时间戳,用的BigInt。如果只查询其他String类型的字段,程序正常。但是加上这个字段,就会报错。 package com.athub.dcpoints.

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-22 文章 chuyuan
好勒,这种方案已经成功了,非常感谢。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
可以通过SQL的where条件来过滤吧 chuyuan 于2020年9月21日周一 下午6:48写道: > 好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ -- Best, Benchao Li

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
好勒,谢谢,我试试这种方案,之前注册成table,是为了按条件过滤数据;麻烦问下,直接使用ddl,如何过滤kafka中的数据? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
为什么要用DataStream解析之后再注册成table呢? 可以尝试下直接用DDL声明一个source,用内置的json format来解析。 chuyuan 于2020年9月21日周一 下午4:44写道: > 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: > { > "properties":{ > "platformType":"APP", > "$os":"iOS", > "$screen_width":414, >

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: { "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B236

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 chuyuan
我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: { "properties":{ "platformType":"APP", "$os":"iOS", "$screen_width":414, "$app_version":"1.0", "$is_first_day":false, "$model":"x86_64", "$device_id":"7B236

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-21 文章 Benchao Li
Hi chuyuan, 可以详细描述下你遇到的问题么,比如下面这些信息 - 用的是哪个Flink版本 - SQL(包括DDL和query) - 数据是什么样子的 chuyuan 于2020年9月21日周一 下午2:40写道: > LEGACY('RAW', > 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-09-20 文章 chuyuan
LEGACY('RAW', 'ANY')对应sql中数据类型改为:MAP,仍然报错,异常: org.apache.flink.table.api.TableException: A raw type backed by type information has no serializable string representation. It needs to be resolved into a proper raw type. 方便说下具体实现细节吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/

答复: flink RichFilterFunction重复过滤一条数据

2020-09-18 文章 范超
Hi, 明启,我也遇到了类似的问题,会不会是因为并行度的问题导致? -邮件原件- 发件人: 明启 孙 [mailto:374060...@qq.com] 发送时间: 2020年9月15日 星期二 10:45 收件人: user-zh 主题: flink RichFilterFunction重复过滤一条数据 场景: flink消费kafka,然后过滤掉某种类型的数据,然后打印一条warn类型的数据。 在测试时,我往kafka写了一条会被过滤掉的数据,偶尔会正常打印一条warn,更多的时候会重复打印该条数据,类似重复消费该条数据。 然后我在warn之后写了一条pr

答复: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-09 文章 范超
Transient 都不参与序列化了,怎么可能从checkopont里恢复? -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年9月7日 星期一 12:50 收件人: user-zh@flink.apache.org 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] 可以排查的思路 1. 你的state是否开

答复: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 范超
确实可以通过counter来实现[1] 相应的blog在[2] [1] https://github.com/mbode/flink-prometheus-example [2]https://flink.apache.org/features/2019/03/11/prometheus-monitoring.html -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年8月28日 星期五 14:37 收件人: user-zh@flink.apache.org 主题: Re: flink promethe

答复: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-27 文章 范超
谢谢云哥,可以了! 解决了我的大问题。 -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年8月28日 星期五 13:58 收件人: user-zh 主题: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息 Hi 范超 虽然看不到你的图,但是你的启动命令错误了,所有的options应该放在jar包文件地址前面[1] 1. class name 应该在 jar包地址前面 [2] 2. savepoint/checkpoint 地址

答复: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-27 文章 范超
Hi 唐云哥,收到,我现在测试一下看看 感谢感谢 -邮件原件- 发件人: Yun Tang [mailto:myas...@live.com] 发送时间: 2020年8月28日 星期五 13:58 收件人: user-zh 主题: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息 Hi 范超 虽然看不到你的图,但是你的启动命令错误了,所有的options应该放在jar包文件地址前面[1] 1. class name 应该在 jar包地址前面 [2] 2. savepoint/checkpoi

答复: 关于sink失败 不消费kafka消息的处理

2020-08-27 文章 范超
%8E%E4%BF%9D%E8%AF%81exactly-once%E8%AF%AD%E4%B9%89 -Original Message- From: user-zh-return-6980-wxchunjhyy=163@flink.apache.org On Behalf Of 范超 Sent: Wednesday, August 26, 2020 2:42 PM To: user-zh@flink.apache.org Subject: 答复: 关于sink失败 不消费kafka消息的处理 您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink

答复: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 范超
> 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有, > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6. > 假如这个时候publish message 4 失败了, 那么job restart from last successful > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗 按照我个人理解,应该是sink环节的部分失

答复: 关于sink失败 不消费kafka消息的处理

2020-08-25 文章 范超
您好 BenChao ,不知道是否有可以参考的两阶段提交的Flink 实例或者文档资料 -邮件原件- 发件人: Benchao Li [mailto:libenc...@apache.org] 发送时间: 2020年8月26日 星期三 12:59 收件人: user-zh 主题: Re: 关于sink失败 不消费kafka消息的处理 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。 范超 于2020年8月26日周三 上午11:38写道: > 大家好,我现在有个疑问 > 目前我使用kafka作

答复: 关于sink失败 不消费kafka消息的处理

2020-08-25 文章 范超
感谢,目前也是通过打开checkpoint来改进的,待会测试一下看看是不是可以 -邮件原件- 发件人: Benchao Li [mailto:libenc...@apache.org] 发送时间: 2020年8月26日 星期三 12:59 收件人: user-zh 主题: Re: 关于sink失败 不消费kafka消息的处理 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。 范超 于2020年8月26日周三 上午11:38写道: > 大家好,我现在有个疑问 > 目前我使用kafka作为sou

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 文章 Yang Wang
plication指定jar包地址到hdfs上,看是否能够复现。 > > > Best, > xiao cai > > > 原始邮件 > 发件人: Congxian Qiu > 收件人: user-zh > 发送时间: 2020年8月24日(周一) 20:39 > 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 文章 xiao cai
Hi 确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。 Best, xiao cai 原始邮件 发件人: Congxian Qiu 收件人: user-zh 发送时间: 2020年8月24日(周一) 20:39 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 Hi 理论上第

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 文章 Congxian Qiu
Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, Congxian xiao cai 于2020年8月20日周四 下午2:27写道: > Hi: > 感谢答复,确实是个思路。 > > 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这

回复:答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-19 文章 xiao cai
Hi: 感谢答复,确实是个思路。 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 Best, xiao cai 原始邮件 发件人: 范超 收件人: user-zh@flink.apache.org 发送时间: 2020年8月20日(周四) 09:11 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会

答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-19 文章 范超
我之前开启job的failover restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task executor No TaskExecutor registered under containe_. 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: us

答复: flink sql在实时数仓中,关联hbase维表频繁变化的问题

2020-08-17 文章 zhao liang
第三条我有个大概的想法,kafka数据把原有时间戳减去一个你觉得足够的延迟时间,生成一个新的时间戳,flink用这个时间做watermark,原有时间保留用来和habse进行匹配。 发件人: Jim Chen 日期: 星期一, 2020年8月17日 16:36 收件人: user-zh 主题: flink sql在实时数仓中,关联hbase维表频繁变化的问题 大家好: 我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的 现在遇到的几个比较棘手的问题: 1、自己在实现AsyncTableFu

答复: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-14 文章 xiao cai
Hi Jark: 感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as of语句,添加后就不会再报这个错了。 另外有个问题想请教:1.11中新版hbase connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的? 再次感谢。 Best Xiao Cai 发送自 Windows 10 版邮件应用 发件人: Jark Wu 发送时间: 2020年8月14日 23:23 收件人: use

答复: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 zhao liang
请问table api要重构是哪个FLIP,我想关注下 发件人: Shengkai Fang 日期: 星期四, 2020年8月13日 11:09 收件人: user-zh@flink.apache.org 主题: Re: 关于FlinkSQL的一些最佳实践咨询 针对(3)社区建议使用sql api, table api现在正准备重构。 靳亚洽 于2020年8月13日周四 上午11:00写道: > 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf > jar包和平台提供的解析flink sql的ja

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Benchao Li
Hi, 目前Json Format的实现就是假设json最外层是一个json object,暂时还无法做到顶层的所有字段无限扩展。 如果是在SQL里面,可以直接定义成map类型就可以,比如: ```SQL CREATE TABLE source ( d MAP ) WITH (...) ``` Zhao,Yi(SEC) 于2020年8月11日周二 下午4:58写道: > 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: > > 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 > 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无

Re: 答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 Zhao,Yi(SEC)
刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? stEnv.connect( new Kafka() .properties(TestKafkaUtils.getKafkaProperties()) .version("universal")

答复: 关于FLinkSQL如何创建类json无限扩展的表结构问题

2020-08-11 文章 zhao liang
Hi,你图挂了,换个图床试试呢 发件人: Zhao,Yi(SEC) 日期: 星期二, 2020年8月11日 16:04 收件人: user-zh@flink.apache.org 主题: 关于FLinkSQL如何创建类json无限扩展的表结构问题 刚刚进一步发现一个方法可以做动态扩展的类型。代码如下: [cid:image001.png@01D66FF8.F697E2D0] 这种情况下,首先表有一个字段d,然后d是json无限嵌套的类型都可以。 此处有第一个疑问:如何不要d这个字段,让顶层就是一个无限扩展的map结构呢? 其次,这种效果我打印了下table的schema如下,其中d

答复: 请教flink计算一些报表需求的实现(附带任务图)

2020-08-10 文章 zhao liang
针对第5条,似乎只能让source并行度和kafka的topic的分区一致才行,另外针对最后你说的每个任务的字段和类型都不一样,那把这些信息都当做维表信息使用,你已经拆分任务了,那每个任务跑指定的一些维表数据,你的图看着有种在重复计算的样子,不知道你的具体任务信息,可否按5分钟和小时两个大算子进行统计汇聚,topn当sideooutpu输出 发件人: lfgy <15029270...@163.com> 日期: 星期二, 2020年8月11日 00:11 收件人: user-zh@flink.apache.org 主题: 请教flink计算一些报表需求的实现(附带任务图) 最近在做一个报

答复: 【Flink sql 1.10.0问题】

2020-07-26 文章 zhao liang
看不见图,最好别发图,以文字发 发件人: 忝忝向�� <153488...@qq.com> 日期: 星期日, 2020年7月26日 22:05 收件人: user-zh 主题: 【Flink sql 1.10.0问题】 Hi,all: Flink 1.10.0 sql提交报错如下,请问是什么原因呢? 谢谢. [cid:37FA1919@1626B468.F28B1D5F]

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 刘首维
Hi, godfrey 好的,如果可以的话,有了相关讨论的jira或者mail可以cc一下我吗,谢谢啦 发件人: godfrey he 发送时间: 2020年7月22日 17:49:27 收件人: user-zh 抄送: Jark Wu; xbjt...@gmail.com; jingsongl...@gmail.com 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题 Hi,首维 感谢给出非常详细的反馈。这个问题我们之前内部也有一些讨论,但由于缺乏一些真实场景,最后维持了当前的接口。 我们

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 刘首维
Hi, Jark 感谢你的建议! 我们这边充分相信社区在SQL/Table上的苦心孤诣,也愿意跟进Flink在新版本的变化。 先看我遇到问题本身,你的建议确实可以帮助我解决问题。我想聊一下我对问题之外的一些想法 ``` > 2. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能 这个我觉得也可以封装在 SinkFunction 里面。 ``` 比如上述这个问题2,我们确实可以把它做到SinkFunction中,但是我个人认为这可

Re:答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-22 文章 Michael Ran
这个需求 我们也比较类似:要获取注册的表信息,自己用stream+table 实现部分逻辑 在 2020-07-22 13:47:25,"刘首维" 写道: >Hi JingSong, > > 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL > SDK > 下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子 > > > 1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 > 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStr

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi JingSong, 感谢回复,真心期待一个理想的解决方案~ 发件人: Jingsong Li 发送时间: 2020年7月22日 13:58:51 收件人: user-zh; Jark Wu 主题: Re: 关于1.11Flink SQL 全新API设计的一些问题 Hi 首维, 非常感谢你的信息,我们试图 去掉太灵活的“DataStream”,但是也逐渐发现一些额外的需求,再考虑和评估下你的需求。 CC: @Jark Wu Best, Jingsong On Wed, Jul 22, 2020

答复: 关于1.11Flink SQL 全新API设计的一些问题

2020-07-21 文章 刘首维
Hi JingSong, 简单介绍一下背景,我们组的一个工作就是用户在页面写Sql,我们负责将Sql处理转换成Flink作业并在我们的平台上运行,这个转换的过程依赖我们的SQL SDK 下面我举几个我们比较常用且感觉用1.11新API不太好实现的例子 1. 我们现在有某种特定的Kafka数据格式,1条Kafka数据 会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,用于处理这种情况,这样对用户是透明的。 2. 我们目前封装了一些自己的Sink,我们会在Sink之前增加一个proc

答复: Flink catalog的几个疑问

2020-07-21 文章 刘首维
hi all, 我在想如果社区提供一个unified metastore server是不是会解决这个问题,然后写一个(一系列)catalog和这个metastore对应 发件人: Jark Wu 发送时间: 2020年7月22日 11:22:56 收件人: user-zh 主题: Re: Flink catalog的几个疑问 非常欢迎贡献开源一个轻量的 catalog 实现 :) On Wed, 22 Jul 2020 at 10:53, Jingsong Li wrote: > Hi, > > HiveCat

Re: 回复:答复: flink state

2020-07-15 文章 Congxian Qiu
Robert.Zhang <173603...@qq.com> > 日期: 星期三, 2020年7月15日 15:22 > 收件人: user-zh , user-zh@flink.apache.org < > user-zh@flink.apache.org> > 主题: 回复:答复: flink state > 是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed > state把某些满足条件的key存入这个broadcast state,并在其他算子计算的时候使用这个broadcas

答复: 回复:答复: flink state

2020-07-15 文章 zhao liang
Broadcast state是无法满足你的要求的,估计你只能像我这样把涉及的state数据融入到数据流中,在算子中针对不同的类型数据做区分了,等于人工维持这个broadcast的流的变化。 发件人: Robert.Zhang <173603...@qq.com> 日期: 星期三, 2020年7月15日 15:22 收件人: user-zh , user-zh@flink.apache.org 主题: 回复:答复: flink state 是这样的,问题在于我需要使用keyed state 来修改broadcast state,比如根据keyed state把某些满足

答复: flink state

2020-07-14 文章 zhao liang
我这边有个类似的实现,需要根据维表数据改变stream的处理,自定义了一个source(从MySQL中定时刷维表数据),kafka的stream union这个维表数据流, 额外增加一个数据类型(维表类型或者事实数据)进行数据的处理,后续算子将这个维表进行不同的处理并存到对应算子的state中。 发件人: Congxian Qiu 日期: 星期二, 2020年7月14日 14:03 收件人: user-zh 主题: Re: flink state Hi Robert Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2] [1] https://c

答复: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-05 文章 陈凯
Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。 我之前提了个jira 描述了这个问题 https://issues.apache.org/jira/browse/FLINK-18196 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch: https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19 -邮件原

答复: 人为生成retract记录

2020-04-25 文章 刘首维
Hi, 我们这边做了人为生成retract记录的尝试,也是用在了binlog上,结果上还是可以的但是改造成本还是比较高的,需要自己添加对应的关系算子和优化规则。此外,这样做(有可能)会干扰执行计划的优化,期待FLIP105和95的落地! 发件人: lec ssmi 发送时间: 2020年4月26日 10:07:48 收件人: flink-user-cn 主题: 人为生成retract记录 Hi: 假设我现在将上游retract后的结果写入到kafka,然后下游程序消费kafka去做聚合操作。

答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 刘首维
Hi benchao, 感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题: 1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新 2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回 或者说时区在设计这个部分的时候,有什么其他的考量吗 发件人: Benchao Li 发送时间: 2020年4月21日 18:28:09 收件人

答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 刘首维
Hi benchao, 非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group by的一个key应该被创建一次,可是我做实验的时候(在create acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。 为了方便你帮我分析,我来补充一下环境和场景: 版本: 1.7.2/1.9

答复: 如何 远程通知 flink 开启任务

2019-12-03 文章 zhao liang
这个查下文档的 RestAPI,里面有启动任务的接口 发件人: 李军 日期: 星期三, 2019年12月4日 10:06 收件人: user-zh 主题: 如何 远程通知 flink 开启任务 请教一个问题: 目前想用flink做批处理,但是不知道任务启动后,怎么通知flink 开始跑任务。 就比如像发个请求一样的通知; 再或者是任务接受到数据了就开始处理吗??

答复: flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-11 文章 苏 欣
感谢大佬解答,对于处理窗口迟到数据的话是不是可以通过setIdleStateRetentionTime方法来设置? 发送自 Windows 10 版邮件应用 发件人: Benchao Li 发送时间: Wednesday, September 11, 2019 6:38:44 PM 收件人: user-zh@flink.apache.org 主题: Re: flink sql中怎么表达窗口的提前触发或延迟触发 目前

答复: Re: 如何优化flink内存?

2019-09-05 文章 戴嘉诚
对,我这边使用的也是相同的操作 发件人: 陈赋赟 发送时间: 2019年9月5日 16:08 收件人: user-zh@flink.apache.org 主题: Re:Re: 如何优化flink内存? HI 我在项目中有遇到过类似的情况,我说下我的想法和思路。 伊始是需要统计90天事件窗口中用户浏览事件总数,如果是在近30天内有浏览事件则累加1次,在30天内没有浏览事件但在 30天 ~ 90天内有其他浏览事件则记0次(需求比较奇葩),我们使用了滑动窗口(长度90天 步长1天 数据进来实时trigger触发计算)因为需要拿到窗口的结束时间所以一开始是用windowProcess

答复: 如何优化flink内存?

2019-09-04 文章 戴嘉诚
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口 发件人: Yifei Qi 发送时间: 2019年9月5日 13:42 收件人: user-zh@flink.apache.org 主题: Re: 如何优化flink内存? 你的意思是自己去实现滑动窗口的功能么? 戴嘉诚 于2019年9月4日周三 下午10:51写道: > 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存 > > Yifei Qi 于2019年9月4日 周三20:07写道: > > > 大家好: > > > > > > >

答复: 关于elasticSearch table sink 构造过于复杂

2019-08-26 文章 aven . wu
你好: 可以自己构建 indexRequest 设置id,type,source 等字段 ElasticsearchSinkFunction 不知道是否满足你的需求? 发件人: Jark Wu 发送时间: 2019年8月26日 18:00 主题: Re: 关于elasticSearch table sink 构造过于复杂 > ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的. 据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。 如果使用的 blink planner,可以使用 deduplicate with

答复: How to load udf jars in flink program

2019-08-15 文章 苏 欣
我们是这么做的,你们可以试一下。用-yt指定jar所在的目录,-C将jar添加进classpath 例如:flink run -m yarn-cluster -yt /external/libs -C file:///external/libs/func1.jar -C file:///external/libs/func2.jar 发送自 Windows 10 版邮件应用 发件人: 刘建刚 发送时间: Thur

答复: flink1.10版本连接hive报错

2019-08-12 文章 苏 欣
感谢各位大佬提供思路,我增加了lzo的jar后不再报这种错而且能取到hive表的数据了。 我以为在flink-shaded-hadoop-2-uber里面包含了所有hadoop相关的包所以没去考虑缺包的问题😂 附下缺少的pom内容: org.apache.hadoop hadoop-lzo 0.4.13 发送自 Windows 10 版邮件应用 发件人: zhisheng 发送时间: S

答复: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

2019-08-11 文章 Yuan,Youjun
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。 当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window end,但是window还没fire的情况。 -邮件原件- 发件人: Ever <439674...@qq.com> 发送时间: Sunday, July 14, 2019 5:00 PM 收件人: user-zh 主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题 第四条数据来的时间戳是: 03:17:55, 水印时间这时候应该是03:17:50,

答复: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 文章 戴嘉诚
你好, 可以通过flink的restFul去调用保存savepoint 发件人: liu zhongfeng 发送时间: 2019年8月9日 20:28 收件人: user-zh@flink.apache.org 主题: 恢复savepoint,除了命令行,能通过代码获取吗? 如题,restore savepoint,除了run flink -s savepointpath之外,能通过代码恢复吗,因为公司集群没法输入命令行。如果可以的话,能给个小demo,或者API也可以 谢谢。 Best, Rio Liu, 刘中锋

答复: jobmanager 日志异常

2019-08-06 文章 戴嘉诚
你好, 谢谢!已经找到原因了 发件人: Biao Liu 发送时间: 2019年8月6日 13:55 收件人: user-zh 主题: Re: jobmanager 日志异常 你好, > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED > SIGNAL 15: SIGTERM. Shutting down as requested. 这是收到了 signal 15 了 [1],Wong 说得对,搜一下 yarn node manager 或者 yarn resource

答复: Re: Flink RocksDBStateBackend 问题

2019-08-05 文章 戴嘉诚
不是,文档上有说,filesystem是会把正在运行的数据存储在tm的内存中,然后触发checkpoint后,才会写入文件系统上,而rocksdb是直接把运行中的数据写到了rocksdb上,看样子是不占用运行中的tm的内存。 https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html#the-fsstatebackend `The FsStateBackend holds in-flight data in the TaskManager’s me

回复:答复: 通用序列化类

2019-07-17 文章 zhisheng2018

答复: 通用序列化类

2019-07-16 文章 戴嘉诚
你好 我这里主要是在一个自定义的http source中定时获取一堆数组id,打算转成set集合,然后广播出去,和kafka source 进行对接,然后在对kafka的每一条数据和set集合进行判断,过滤数据中的id不在数组id里面的数据。原本是用List集合,但是可能因为这里面的id数量会比较多,所以就想改成hashset 集合来进行判断,达到提高这个过滤的目的。 那我还是用回List集合来广播吧 谢谢 发件人: Congxian Qiu 发送时间: 2019年7月16日 20:43 收件人: user-zh@flink.apache.org 主题: Re: 通用序列化类

答复: Scala 异步 io 实现

2019-07-08 文章 venn
Thanks for your attention, as your words the " org.apache.flink.streaming.api.functions.async.RichAsyncFunction " extend " org.apache.flink.streaming.api.functions.asyn.AsyncFunction", but Scala AsyncDataStream.orderedWait parameter AsyncFunction full path is " org.apache.flink.streaming.api.s

答复: 注册缓存文件的热更新问题

2019-07-04 文章 戴嘉诚
好的,谢谢 发件人: Biao Liu 发送时间: 2019年7月5日 10:39 收件人: user-zh 主题: Re: 注册缓存文件的热更新问题 据我所知,没有 自己写代码实现吧 戴嘉诚 于2019年7月5日周五 上午10:36写道: > 好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢? > 谢谢! > > 发件人: Biao Liu > 发送时间: 2019年7月5日 10:20 > 收件人: user-zh > 主题: Re: 注册缓存文件的热更新问题 > > 这个接口只会在提交 job 时工作一次,不会检测更新 > > Xintong

答复: 注册缓存文件的热更新问题

2019-07-04 文章 戴嘉诚
好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢? 谢谢! 发件人: Biao Liu 发送时间: 2019年7月5日 10:20 收件人: user-zh 主题: Re: 注册缓存文件的热更新问题 这个接口只会在提交 job 时工作一次,不会检测更新 Xintong Song 于2019年7月4日周四 下午7:39写道: > 你好, > > 这个应该是不可以的。 > > Thank you~ > > Xintong Song > > > > On Thu, Jul 4, 2019 at 4:29 PM 戴嘉诚 wrote: > > > 大家好:

答复: 为何会报"Window can only be defined over a time attribute column."??

2019-06-20 文章 Chennet Steven
貌似问题变成这样子 程序流程如下:WM=watermark source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)>转换C(sql TUMBLE)-->TableC-->Sink 为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark?? From stevenchen webchat 38798579 发件人: Chennet Steven 发送时间: Friday, June 21, 2019 1

答复: 关于在Flink-process中调用其他系统API的问题

2019-06-18 文章 季 鸿飞
Hi:大家好! 关于在Flink-process中调用其他系统API的问题,已解决! 感谢大家! 发送自 Windows 10 版邮件应用 发件人: 季 鸿飞 发送时间: Monday, June 17, 2019 9:40:54 PM 收件人: user-zh@flink.apache.org 主题: 关于在Flink-process中调用其他系统API的问题 大家好: 我在Broadcast广播

答复: Re:flink kafka source在并行分布式下是怎么确定一个subtask消费哪个kafka partition的?

2019-06-12 文章 Shi Quan
1. 会运行在每个subtask中; 2. 可以获取多个KTP 3. 代码请阅AbstractPartitionDiscorver中的setAndCheckDiscoveredPartition方法和KafkaTopicPartitionAssigner中的assign方法。大体来说会根据并行度(subtask总数)、subtask index、topic partiion来具体分配。 发送自 Windows 10 版邮件应用 _

  1   2   >