Hi,
在 `MailboxProcessor#runMailboxLoop` 中分别计算 default mailbox actions(处理业务数据) 和
event mailbox actions(checkpoint 同步阶段、timer等) 的时间占比,假设分别为t1,t2,都介于[0,1]之间。
那么理论上 t1 + t2 + idle的占比 = 1;这样可以根据 t1, t2 的值来判断单个线程的 CPU 是否跑满了。
Best,
Hailong
在 2021-01-29 12:25:56,"1305332" <1305...@163.com>
需要的 window size 大吗,可以使用 min-batch 的 no-window agg 绕过?
Best,
Hailong
在 2020-12-30 17:41:50,"孙啸龙" 写道:
>Hi,大家好:
>
>版本:1.12.0
>方式:Flink sql
>问题:双流join后是回撤流,不能窗口计算,这种应用场景是怎么处理的?
这个应该是 `kudu-client.jar` 里面应该打进去的吧。
可以看下 jar -tf kudu-client.jar | grep 'com.stumbleupon.async.Callback'
ps: 你的 close 方法有 npe,应该是 客户端还没构建出来,可以判空下。
Best,
Hailong
在 2020-12-30 17:24:48,"superainbower" 写道:
>HI,大家好:
>我有一个应用场景,利用Flinksql读取Kafka数据写入Kudu,由于官方没有Kudu Connector,自定义了一个Kudu Sink
你在启动之后才把 jar 包放进去的吗,重启下 SQL Client 试试?
在 2020-12-30 15:26:59,"jiangjiguang719" 写道:
>使用 SQL Client,进行hive查询时报错:
>命名有了flink-connector-hive_2.11-1.12.0.jar,还是报java.lang.ClassNotFoundException:
>org.apache.flink.connectors.hive.HiveSource
>麻烦看一下
>
>
>报错信息:
>
>Flink SQL> select count(*) from
根据 keyGroup 的实现特性,并发度最好是 2 的 n 次方。
在 2020-12-28 10:38:23,"赵一旦" 写道:
>是否有必要将并行度设置为128的约数我意思是。
>
>Shengkai Fang 于2020年12月28日周一 上午10:38写道:
>
>> hi, 如果热点是某个key的数据量较大造成的,那么re-partition依旧无法解决这个问题。
>> 个人认为最好的解决办法是基于window的 mini-batch 以及 local-global agg,社区正在解这类问题,可以关注下[1]
>>
>>
不是的。在提交运行之后,如果那两个 insert 是从同一张表 select 出来的话,是会分流发送到 table1 和 table2,并没有先后顺序。
在 2020-12-21 10:45:25,"占英华" 写道:
>这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的
>
>> 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
>>
>>
>
可以的,比如将结果写入table1,table2 ……
Insert into table1 ……;
Insert into table2 ……;
Best,
Hailong
在 2020-12-19 08:30:23,"占英华" 写道:
>Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?
Hi,
可以试下 CAST(eventTime AS TIMESTAMP)
Best,
Hailong
在 2020-12-19 11:14:53,"ゞ野蠻遊戲χ" 写道:
>大家好!
>
>当我把DataStream流转成Table,并且指定了rowtime,然后使用带有udtf的sql传入tableEnv.sql(),抛出如下错误:Rowtime
> attributes must not be in the input rows of a regular join. As a workaround
>you can cast the time
saction_sn STRING,
>>>> > > >> > > transaction_type BIGINT,
>>>> > > >> > > merchant_id BIGINT,
>>>> > > >> > > transaction_id BIGINT,
>>>> > > >> > > status BIGINT
>>>> > > >>
Hi,
1. projection prune 可查看:CoreRules.PROJECT_REMOVE,
FlinkLogicalCalcRemoveRule.INSTANCE
2. projection push into tablesource 可查看:PushProjectIntoTableSourceScanRule
Best,
Hailong
在 2020-12-15 20:57:32,"SmileSmile" 写道:
>hi,社区的各位,是否有了解flink sql的列裁剪的实现原理?
>
Hi,
Kafka010TableSourceSinkFactory 是表示使用 legacy property keys,
Kafka010DynamicTableFactory 表示使用 new property keys[1]
从你的报错来看,是使用了 new
property。所以需要在/META-INF/services/org.apache.flink.table.factories.Factory
存在值:org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory
并且
Hi,
Window agg 使用可以参考[1],其中 first argument 可以是 Process time 或者 Eventime。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
Best,
Hailong
在 2020-12-14 09:41:12,"guoliubi...@foxmail.com" 写道:
>TUMBLE第一个参数需要的就是bigint,你这边time_local
Hi,
订阅 Flink 中文邮件需发送至 user-zh-subscr...@flink.apache.org 更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Hailong
At 2020-12-10 20:18:27, "China_fei" wrote:
>aaa
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
如果使用 FlinkKafkaConsumer010 的话,可以调用 FlinkKafkaConsumer010#setRateLimiter(new
GuavaFlinkConnectorRateLimiter().setRate)
Hi,
是的,感觉你是对的。
`JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
`OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
时候调用format.flush。
WDYT @Jark @ Leonard
Best,
Hailong
在 2020-12-09 17:13:14,"jie mei" 写道:
>Hi, Community
>
>JDBC
http://apache-flink.147419.n8.nabble.com/FlinkSQL-JsonObject-td9166.html#a9259
这个邮件列表有相似的问题,你看下有没有帮助。
PS:1.12 即将发布,也支持了 Raw 类型[1],也可以使用这个类型,然后代码自己 UDF 再处理。使用 Raw 类型也有个好处是,Source
消费不会因为 format 解析慢导致任务的瓶颈在拉数据慢,因为往往 Source 的并发度最大也只能是中间件的分区数,比如 Kakfa。
[1]
理论上定义成 String 是可以拿到这个 JsonObject string 的值的。你调试看看为什么为 null?
如果确定为 null 的话,这个应该是 1.11 的一个 bug。
PS:我在 1.12 上测试了下,嵌套的定义成 String 可以拿到这个值。
在 2020-12-07 14:42:12,"xiao cai" 写道:
>String不行,取出来的值是null
>
>
> Original Message
>Sender: silence
>Recipient: user-zh
>Date: Monday, Dec 7, 2020 14:26
方便发下完整的 SQL 不?
在 2020-12-07 16:31:42,"xushanshan" <1337220...@qq.com> 写道:
>业务场景:
>滑动窗口大小5分钟,滑动频率1分钟,使用事件事件做watermark,发现滑动窗口的计算结果输出包含delete状态的数据且窗口计算的触发频率不是配置的1分钟,问题的原因是什么?
>
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
其中 条件是
`Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time - 30
s ` 吧
可以参考以下例子[1],看下有木有写错。
[1]
Schema 不太确定的话,那么下游怎么用这个数据呢?
Best,
Hailong
在 2020-12-07 15:21:16,"xiao cai" 写道:
>ROW需要写明具体的字段类型,比如:
>ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema
>
>
> Original Message
>Sender: 李轲
>Recipient: user-zh
>Date: Monday, Dec 7, 2020 16:14
>Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型
>
>
Hi,
你是需要对维表里面的某些字段进行计算过滤等吗,可以参考:https://github.com/apache/flink/blob/53a4b4407816c2780fed2f8995affbebc1f58c3c/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala#L162
Best,
Hailong
在 2020-12-05 14:11:26,"leiyanrui"
这个应该只是个 error 的日志,方便也发下retry times = 1 和 retry times = 2 的日志看下吗
在 2020-12-03 16:17:27,"yanzi" 写道:
>hi Leonard:
>
>报错信息如下:
>[2020-12-02 22:01:03.403] [ERROR] [jdbc-upsert-output-format-thread-1]
>[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
>executeBatch error,
Hi,
你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的
Connector?
Best,
Hailong
在 2020-12-03 14:44:18,"xuzh" 写道:
>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories
>for identifier 'jdbc' that implement
Hi,
你的 Flink 版本是哪个呢。从报错来看你在用 legacy planner,可以使用 blink planner 试试。
Best,
Hailong
在 2020-12-03 10:02:08,"18293503878" <18293503...@163.com> 写道:
>大家使用Flink SQL的tumble函数时,将结果表转换为流,报如下错误的异常吗
>Exception in thread "main" java.lang.NoSuchMethodError:
应该出现问题的之前任务都重启了下?
感觉是类加载顺序的问题,因为从栈看,正确的栈应该会出现 `PhoenixPreparedStatement`,但是却是
`AvaticaPreparedStatement `,说明是先加载到了 Avatica 下面的类。
1. 可以在启动的 tm 的 jvm 中加一个 `-verbose` 看下每次重启类是从哪个包加载出来的,是否符合预期,
2. 可以在 lookupfunction 里面直接使用 PhoenixPreparedStatement 类,而不是 `PreparedStatement`
接口,看能不能绕过。
Best ,
Hi,
退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Hailong
在 2020-12-02 10:32:02,"541122...@qq.com" <541122...@qq.com> 写道:
>退订
>
>
>
>541122...@qq.com
Hi,
退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Hailong
At 2020-12-02 10:23:08, "程明" wrote:
>
Hi,
引用 Jark 对邮件列表中另一个相关的问题的回答,详情可查看[1]。
希望对你有帮助。
[1] http://apache-flink.147419.n8.nabble.com/flink-sql-td8884.html#a
Best,
Hailong
在 2020-12-01 10:09:21,"王羽凡" 写道:
>flink-sql-client执行建表:
>
>CREATE TABLE source_xxx (
> id INT,
> ctime TIMESTAMP
>) WITH (
> 'connector' = 'kafka',
>
Hi,
Collect 函数返回 Multiset 类型 ,可以使用 Map 试试
Best,
Hailong
在 2020-12-01 18:03:15,"chegg_work" 写道:
>大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?
Hi,
从你的堆栈看,你自定义的 “com.custom.jdbc.table.JdbcRowDataLookupFunction” 函数引用的
PreparedStatement 包不对。
具体实现可以参考:https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
我理解如果 phoenix 支持标准的
Hi,
这个错误应该是找到了老的这个类的 .class 文件。
你可以编译下整个工程"mvn clean install -DskipTests"。
Best,
Hailong
在 2020-11-27 14:20:42,"zhy" 写道:
>hi、
>感谢,确实是这个问题,我现在跑测试用例遇到下面问题,请问什么原因哪,我看文件是存在的
>Error:(32, 25) object contrib is not a member of package org.apache.flink
>import
Hi,
`Percentile` 函数应该是 hive 内置的 UDF,在 Flink SQL 可以直接使用 Hive 的内置 UDF,
具体使用方式可查阅文档[1].
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_functions.html
Best,
Hailong
在 2020-11-26 21:55:35,"爱成绕指柔" <1194803...@qq.com> 写道:
>你好:
> 目前flink
Hi,
+I 表示是一条 insert 的数据,其它类型的可以查看:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/types/RowKind.java
其中 NPE 问题,你是用哪个版本的呢? 方便复制下 sql 和 udtf 吗,我看下能不能复现。
PS:“第一次调用正常” 是指?
Best,
Hailong
在 2020-11-26 17:25:03,"bulterman" <15618338...@163.com> 写道:
Hi,
是的,个人觉得可以提供一个配置项来控制 task Name。
完整的 task name 有助于排查问题等,简短的 task name 有助于在生产环境中 metric 的采集,可以极大较少发送的网络开销,存储空间等。
已建立个了 issue :https://issues.apache.org/jira/browse/FLINK-20375
Best,
Hailong
在 2020-11-24 14:19:40,"Luna Wong" 写道:
>FlinkSQL 生成的Metrics数据
另一张表可以这么定义:
String rTable = "CREATE TABLE r_table ( " +
" r_a INT, " +
" r_b string, " +
" r_pt AS now(), " +
"WATERMARK FOR r_pt AS r_pt" +
") WITH ( " +
" 'connector' = 'datagen', " +
Hi,
因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
Best,
Hailong
在 2020-11-25 16:23:27,"Asahi Lee" <978466...@qq.com> 写道:
>你好! 我需要将事件时间的流同处理时间的流做Interval Join时提示错误,我是用的是flink
Hi,
据我所知,FlinkSQL 不支持将迟到的数据输出到侧流中。
如果你下游使用的是 window 的话,可以通过设置
`table.exec.emit.late-fire.enabled` 和 `table.exec.emit.late-fire.delay` 来触发晚于
watermark 到达的数据。
其中允许等待晚与 watermark 的数据的时间由 `table.exec.state.ttl` 控制,等价于 Datastream 中的
allowedLateness,
故 window 的最大等待时间为 watermark 的 outOfOrder +
这个错误感觉是 Hbase 的错误。具体实现的话,你可以参考社区的 HBaseSinkFunction[1] 的实现。
[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
Best,
Hailong
在 2020-11-24 09:32:55,"bradyMk" 写道:
>请教各位:
Hi,
不可以的,其中链接[1] 是Flink SQL 支持的所有内置函数,链接[2] 是 Flink SQL 允许自己定义函数,来满足个性化需求。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html
Best,
Hailong
在 2020-11-24
数据库中主键的设置跟 primary key 定义的一样不?
Best,
Hailong
在 2020-11-23 13:15:01,"赵一旦" 写道:
>如题,按照官方文档,当mysql表定义了primary key的时候,会使用UpsertTableSink,并且会使用insert on
>duplicate方式写入。
>
>但我在使用中,发现报了 duplicate entry的错误。例如:
>Caused by: com.mysql.jdbc.exceptions.jdbc4.
>MySQLIntegrityConstraintViolationException:
是指在 Create Table 时候可以设置每一列的 default value,当这个列的值不存在时候,直接使用 default value 值?
就像传统的 DB 一样。
Best,
Hailong
在 2020-11-20 16:21:28,"Jark Wu" 写道:
>你说的补全字段是指什么?有没有具体的例子?自动推导 schema 么?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 17:09, 孟小鹏 wrote:
>
>> 目前遇到一个痛点 FlinkSQL可以在DDL时 补全字段并设置默认值吗?这样少了去处理ETL的步骤
>>
Hi sparklelj,
Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom
trigger 来触发 window 的计算[1]。
它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。
所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的?
[1]
可以 grep 看下哪些 jar 包包含这 2 个类的?
在 2020-11-20 08:51:59,"m13162790856" 写道:
>HI:
> 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去,
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
Hi,
这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。
Best,
Hailong
Hi,
你需要继承 RichFunction,然后实现下 open 方法。
其中在 open 方法里面需要,
1. 建立 Mysql 连接
2. 获得 PreparedStatement 对象
3. 根据 SQL 获得 ResultSet
4. 遍历 ResultSet load 在内存中
5. 释放连接资源
在 2020-11-18 22:58:52,"ゞ野蠻遊戲χ" 写道:
>大家好!
>如何使用DataStream在任务初始化时候加载mysql数据到flink任务的内存中,请给我一个demo。
>谢谢,
>jiazhi
Hi,
可以看下以下的文章是否对你有帮助[1][2]。
如果不需要启动时候获得一次已有的快照,可以设置下 snapshot.mode 参数[3]
[1]
https://dba.stackexchange.com/questions/134923/query-execution-was-interrupted-max-statement-time-exceeded
[2] https://mysqlserverteam.com/server-side-select-statement-timeouts/
[3]
感觉还有其它 root cause,可以看下还有其它日志不?
Best,
Hailong
At 2020-11-18 15:52:57, "赵一旦" wrote:
>2020-11-18 16:51:37
>org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>Partition b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
>not found.
>at
是的,或者自己可以 cherry-pick 这个 MR,然后编译把。
在 2020-11-18 16:47:59,"赵一旦" 写道:
>这个问题现在应该还是无解的吧。那个topic-pattern看了下文档也不存在还,1.11不支持对吗?
>
>hailongwang <18868816...@163.com> 于2020年10月26日周一 下午8:37写道:
>
>> Hi s_hongliang,
>> 目前的 Master 分支(1.12) 版本支持一个 source 指定消费多个 topics
应该是 window 还没达到触发的条件,可以看下 watermark 是否在推进
在 2020-11-18 15:29:54,"huang botao" 写道:
>Hi ,请教一个奇怪的问题:
>
>streamSource.flatMap(new ComeIntoMaxFlatMapFunction())
>
>.assignTimestampsAndWatermarks(new
>CommonAssignerPeriodWatermarks<>(Time.seconds(1).toMilliseconds()))
>
>
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢
在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join k
我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
[2]
实是存在了一个flink-sql-connector-kafka-0.10_2.12-1.11.1.jar的包。
>请问下这个问题要如何解决
>
>
>
>
>
>
>
>--
>
>kingdomad
>
>
>
>
>
>
>
>在 2020-11-17 17:08:10,"hailongwang" <18868816...@163.com> 写道:
>>从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本
从你的报错上看,你集群上应该是存在 shade 后的 kakfa 0.10 的版本,然后导致先加载到了这个下面的 kafka client 的类。
Shade 后的 kakfa 0.10的版本 的 artifactId 为:
flink-sql-connector-kafka-0.10_${scala.binary.version}
在 2020-11-17 15:47:08,"kingdomad" 写道:
>flink 1.11.1-2.12消费kafka0.10.1.1的时候报错。
>在idea调试没报错,提交到yarn集群就报错了。求助。
>
>
>使用的consumer如下:
Hello,
我使用 MySQLDialect 在本地确认了下,
1. 在数据库需要建主键,因为建了主键 “INSERT INTO ... ON DUPLICATE KEY UPDATE”[1] 语句的 upsert
语义才会生效。
2. 需要在 DDL 中定义 'PRIMARY KEY',因为需要根据 ‘PRIMARY KEY’ 确认是否使用 'upsert query' [2]
[1]
Hi,
这个版本是支持的。
其中插入语句是 "insert into " 而不是 “update into”?
在 2020-11-16 17:04:23,"鱼子酱" <384939...@qq.com> 写道:
>请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多,
>是目前不支持还是我使用的方法不对呢?
>版本:flink 1.11.1
>
>关键的2个sql如下
>
>create table open_and_close_terminal_minute_1 (
> request_date
ent time。 如果整个使用sql表达,怎么把time attribute待下去
>
>
>
>
>
>
>
>
>
>
>在 2020-11-16 15:53:44,"hailongwang" <18868816...@163.com> 写道:
>>Hi zhou,
>> 你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。
>>其中 createTemporaryView 的实现也是间接调用了
Hi zhou,
你是指的 createTemporaryView 这个方法吗,这个方法上也可以指定字段,例子可以查看[1]。
其中 createTemporaryView 的实现也是间接调用了 fromDataStream 方法[2]。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#create-a-view-from-a-datastream-or-dataset
[2]
Hi,
订阅中文用户邮件需发送至 user-zh-subscr...@flink.apache.org , 更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
hailong
At 2020-11-13 17:12:40, "黑色" wrote:
据我所知,“超时”并不会导致 failure counter 加 1,也就是说“超时”并不是“错误”,或者说 Exception。
我觉得是否可以看下 checkpoint 抛了什么 exception 导致超过了最大可容能的数量(默认应该是有异常就会重启)
如果这个 Exception 是期望的或者因为 HDFS 等原因无法避免的话,那么可以适当加大 tolerableCpFailureNumber。
在 2020-11-13 09:13:34,"史 正超" 写道:
>这是个思路,谢谢回复,我先试下。
>
>发件人:
wrote:
>可以建个 issue 支持下 first_value 和 last_value 的 merge 方法。
>
>On Thu, 12 Nov 2020 at 20:37, hailongwang <18868816...@163.com> wrote:
>
>> Hi me,
>>
>>
>> HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane
>> 进行合并,故也需要调用 UDAF 的 merge
Hi me,
HOP window 上使用的 UDAF 需要实现 merge 方法,因为 HOP window 在触发计算时会将多个 pane 进行合并,故也需要调用
UDAF 的 merge 方法将多个 accumulator 合并成一个。而 first_value 和 last_value 不支持 merge。
Best,
hailong
在 2020-11-12 17:07:58,"李世钰" 写道:
>1. FLINK版本 flink1.11
>
>
>
>
>2. 使用的是useBlinkPlanner
>
>
>
>
>3.执行sql
>
>SELECT
这个应该就是长时间没有数据传输导致的 链接不可用,其中可能是:
1、kakfa 的数据和稀疏,数据达到的时间间隔大于 “wait_timeout“
2、一直没有 join 上 mysql 的数据导致的。
可以设置下 数据库的 wait_timeout 看下
PS,如果这个场景,自动恢复应该是没问题的,但是需要确定下根本原因,看是正常的还是异常的,怎么去避免。
最好设置下 checkpoint,这个 kafka 的 offset 是在checkpoint 成功的时候才 ack的,这样就不会导致
这条数据被自动ack而丢弃的。
如果开启 checkpoint 的话,下游支持 upsert
这个场景是跟 session 的特性有点像,但是感觉用 session window 不合理。
因为如果一直没有触发报警,那么历史数据都会在 window 中,或者说 state 中,但是其实只要记录最新的一条就好了。
在 2020-11-12 14:34:59,"Danny Chan" 写道:
>感觉你这个应该是一个 session window 的需求, 超时时间就是 session 的 gap,session 触发的时刻就是报警逻辑
>
>Lei Wang 于2020年11月11日周三 下午2:03写道:
>
>> 有很多边缘机器人设备(我们称为 robot)往
Hi xiexinyuan341,
我理解这边有 2 个问题:
1. “偶尔会出现连接超时”,这个的话有具体的堆栈吗。如果是因为长时间没有数据的查询导致 connection invalid
话,这个在1.12,1.11.3 中应该是解决了[1].
2. 你的 source 是什么组件呢?程序抛异常的话,自动重启或者手动重启话,如果是 “最少一次” 语义的话,应该还是会 join 上 sink
到下游的;或者可以开启 checkpoint,保证 flink 内部的 “精确一次”。
[1]
会多一个 outputConversion 类型转换算子
如果是 DataStream 转 Table API,会多一个 inputConversion 类型转换算子
在 2020-11-11 20:25:31,"Luna Wong" 写道:
>Table API 转 DataStream为啥会出现性能损耗
>
>hailongwang <18868816...@163.com> 于2020年11月11日周三 下午6:28写道:
>>
>> 我理解是使用 使用 Kafka consumer 时使用 `
我理解是使用 使用 Kafka consumer 时使用 `CanalJsonDeserializationSchema` 序列化类就好了?
而不是再实现一个 Connector。
在 2020-11-11 16:56:58,"LittleFall" <1578166...@qq.com> 写道:
>明白了,多谢。
>
>是 Canal-Json 格式的 Kafka Connector.
>
>我们的一个产品 (TiCDC) 已经实现了输出 Canal-Json 格式的 changelog 到 Kafka 中,现在可以方便地使用 table
>api 对接 flink。
>
根据你 Job 的并发和指定的 TM 的规格来计算出 TM 的数量。
在 2020-11-11 16:14:41,"kingdomad" 写道:
>我发现1.11版本的yarn-session.sh废弃了-n参数,那如何指定taskmanager数量?
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>kingdomad
>
可以的,将 table 转换成 datastream,但是会多一层转换的性能消耗。
方便说下哪个 Connector 有现成的 Table Connector 可以满足需求,但是 Datastream
Connector不满足需求呢,具体是什么功能呢
在 2020-11-11 16:08:08,"LittleFall" <1578166...@qq.com> 写道:
>非常感谢你的回复!
>
>问下另一个问题,现在有这样一个场景:
>
>1. table api 的计算无法满足一些需求,需要使用 stream api 进行计算;
>2. 有现成可用的 table api
有更完整的堆栈不?
在 2020-11-11 10:28:02,"丁浩浩" <18579099...@163.com> 写道:
>当我使用flink cdc 对多张表进行关联查询时其中的一张表总是会有锁超时的情况,导致任务无法正常启动,
>请问这种情况应该如何处理?
>org.apache.kafka.connect.errors.ConnectException: Lock wait timeout exceeded;
>try restarting transaction Error code: 1205; SQLSTATE: 40001.
> at
>
Hi Lei,
我理解这篇文章少介绍了 keyby 的逻辑。
可以keyby(robotId),然后在 processFunction 里面使用 ValueState 存储最近一次 robot 的到达时间,
同时注册一个 20min 的timer来触发检测,在检测时候,取出 ValueState 的值都是同一个 robotId的。
Best,
hailong
在 2020-11-11 12:54:22,"Lei Wang" 写道:
>有很多边缘机器人设备(我们称为 robot)往 Kafka 中发消息,如果超过一定时间没有收到消息我们就认为 robot 掉线了。
>
>比如
Hi LittleFall,
这个可能不好对比,最好结合你的需求。
你可以理解为 Table API 为 我们内置了很多标准的算子,比如说 join,unionall 等,简化了我们自己实现的成本。
而 DataStream API 比较灵活,但处理逻辑都需要自己定义。
如果你的需求需要用到 state 或者 timer 的能力的,那么就需要使用 DataStream API。
希望这些对比有帮助。
Best ,
hailong
在 2020-11-10 13:42:39,"LittleFall" <1578166...@qq.com> 写道:
Hi leiyanrui,
当 sink 的并发 小于 kafka partition 个数,同时 sink 并发只有一个时,FixPartitioner
会导致只会往一个分区发数据。
详见 FixPartitioner[1] 的实现,其中 `parallelInstanceId` 表示 subtask 的
编号,从0开始;`partitions.length` 表示该 topic 的分区个数,
最后返回该 subtask 应该往哪个分区发数据。
[1]
Hi,
需要将 null cast 成某个具体的值,比如:
if(type=1,2,cast(null as int))
Best,
Hailong
在 2020-11-10 19:14:44,"丁浩浩" <18579099...@163.com> 写道:
>Select
> id,
> name,
> if(type=1,2,null)
>From
> user ;
>当我执行上面的sql的时候提示我
>[ERROR] Could not execute SQL statement. Reason:
Hi,
从你的报错来看,是 in 不支持隐式 CAST。
你要么可以把 type 定义成 INT,要不把后面的值 CAST 成 TINYINT。
Best,
Hailong Wang
在 2020-11-10 10:41:47,"丁浩浩" <18579099...@163.com> 写道:
>我使用flink sql cdc取连接 mysql表的时候,当我的mysql表type这个字段类型是tinyint类型时 使用type
>in(1,2,3,4,5)会报以下的错误,只有当我把字段类型改成int的时候才能使用in,这是符合预期的吗,当字段类型不匹配的时候 flink
Hi Bob,
可以设置下参数 'state.backend.rocksdb.memory.fixed-per-slot' [1] 看下有没有效果。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-memory-fixed-per-slot
Best,
Hailong Wang
在 2020-11-08 10:50:29,"元始(Bob Hu)" <657390...@qq.com> 写道:
Hi,
退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1].
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Hailong Wang
在 2020-11-07 18:44:44,"Yi Huang" 写道:
>退订
Hi bradyMk,
Bulk-encoded Formats 只能在 Checkpoint 时滚动,详见文档一[1].
Best,
Hailong Wang
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html#bulk-encoded-formats
在 2020-11-06 10:47:33,"bradyMk" 写道:
>Hi,guoliang_wang1335
Hi si_tianqiang,
自定义 UDF 可以解决你的问题吗?
比如 接收 kakfa 的数据字段定义成 hbaseQuery,然后自定义 UDF 去根据 query 查询数据。
Best,
Hailong Wang
在 2020-11-06 10:41:53,"site" 写道:
>看了官网的示例,发现sql中传入的值都是固定的,我有一个场景是从kafka消息队列接收查询条件,然后通过flink-sql映射hbase表进行查询并写入结果表。我使用了将消息队列映射表再join数据表的方式,回想一下这种方式很不妥,有什么好的方法实现sql入参的动态查询呢?
Hi silence,
目前有个 issue [1]在跟进创建 UDF 时候添加 jar 包。
PS:目前在我的内部版本,是扩展了 类似 ADD Dependency 语法,在 job 提交运行时候会把 jar 包等加载到所运行的 classpath 下,
这样就可以让用户在 SQL 中注册 UDF,自己定义 Connector等,但是是非标准 SQL。
[1] https://issues.apache.org/jira/browse/FLINK-14055
Best,
Hailong Wang
在 2020-11-06 09:34:27,"silence" 写道:
Hi,
这个报错只是在规定的时间内没有完成 Savepoint,导致客户端连接 Master 超时,
具体的原因需要看下 Jobmaster 的日志。
PS:在任务一直重启、反压的情况下,一般 Savepoint 都会失败。
Best,
Hailong Wang
在 2020-11-06 09:33:48,"张锴" 写道:
>本人在使用flink savepoint 保存快照的时候,遇到错误,目前不清楚是因为什么导致这个问题,路过的大佬帮忙看下。
>
>flink 版本1.10.1
>
>
>执行 flink savepoint
Hi,
可以看下 Jark 的 《基于 Flink SQL CDC 的实时数据同步方案》文章 [1]. 其中在最后的 Q 中描述了
"首先需要 kafka 在分区中保证有序,同一个 key 的变更数据需要打入到同一个 kafka 的分区里面,这样 flink 读取的时候才能保证顺序。"
个人认为,需要 Update 的 key 可以更 canal 采集到 kakfa 的 hash key 一致,这样就保证了有序?
[1] https://mp.weixin.qq.com/s/QNJlacBUlkMT7ksKKSNa5Q
Best,
Hailong Wang
在
可以确认下 union all 之后的数据是不是根据 group by 的 key 相互覆盖的情况。
在 2020-11-05 13:24:20,"夜思流年梦" 写道:
>
>
>
>
>
>
>
>
>
>flink 版本是1.11的版本了
>
>
>
>
>
>
>
>
>在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道:
>>Hi
Hi,
你是 on-yarn 的模式吗?
JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。
还可以看下 ZK 是否正常等。
On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。
Best,
Hailong Wang
在 2020-11-05 15:03:11,"赵一旦" 写道:
>JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the
>leadership.
>
>
>
Hi liangji,
CP 超时的原因一般是因任务而议的。从你提供的 2 张截图来看,卡在第二个 operator 的 subtask3 上。
上下两个 operator 之间的关系是 forworad 还是 reblance 呢?如果是 forward 的话,可以看下是不是数据倾斜,subtask3
需要处理的数据量比较多。
如果是 reblance 的话,以为 subtask1 和 subtask2 都成功了,所以上游的 barrier 应该都往下发了,所以
subtask3也收到了上游的 barrier,而 reblance 数据量都一样,所以可以看下是不是 sink
Hi wind,
从这行报错堆栈来看:` at
org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) `
,
应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。
Best,
Hailong Wang
在 2020-11-04 16:29:37,"wind.fly@outlook.com" 写道:
>Hi,all
>
Hi liaobiao,
你的 flink 版本是什么呢?
根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert
key,这样就会产生覆盖的情况。
你看下结果是否是这种情况的?
Best,
Hailong Wang
在 2020-11-04 17:20:23,"夜思流年梦" 写道:
>开发者好:
> 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union
>
Hi Asahi,
因为 对于 Records Sent 等指标 Flink 只统计内部的 Metrics,对于 Source input 和 Sink 的output
没有这些指标。
所以你的任务应该是 chain 成了一个operator,导致没有指标。如果真的需要看的话,可以点 UI 上 Metrics tab 进行选择查看。
或者可以将 operator 并发度设置成不一样导致没有 chain在一起;
PS:在生产上建议用 chain,它是在StrreamGraph 转 JobGraph上的优化,这样会减少数据网络的传递的开销以及序列化和反序列化等。
Best,
Hi wangleigis,
退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1] [1]
https://flink.apache.org/community.html#mailing-lists
Best,
Hailong Wang
在 2020-11-04 17:59:45,"wangleigis" 写道:
>
>
>
>退订
>
>
>
>
>--
>
>祝:工作顺利,完事如意!
>
>
Hi marble,
使用 Datastream 开发的话,Kafka connector
的使用可参考文献1[1];EventTime以及WaterMark的使用可以参考文献2[2]。
对应的中文文档对应在文献3和4.
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
[2]
Hi xiao,
从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。
应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value
导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有
windowBounds,所以就报了现在这个错误了。
Best,
Hailong Wang
在 2020-11-03 18:27:51,"xiao cai" 写道:
Hi bradyMk,
在 on yarn 的模式下,如果某个container 被kill 了,是会重新拉起的。
至于整个 job 被kill 了,这种情况应该是自己手动显示的去停止把?
最于重启的话,重启次数可以设置个非常大的数字(~无限重启),但是一旦 job 一直这么重启,我个人任务就算重新拉起也是没用的把?
这个时候应该结合平台的告警策略来进行人工干预了。
Best,
Hailong Wang
在 2020-11-03 09:32:50,"bradyMk" 写道:
Hi,
Flink 自动重启策略[1]可以满足你的需求?
一般来说,如果 container 挂的话,yarn 会重新拉起。Job 因为某些异常失败,flink 也有 策略进行拉起。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#fault-tolerance
Best,
Hailong Wang
在 2020-10-30 10:51:29,"bradyMk" 写道:
Hi zjfplayer,
可以使用 Now()
函数,详见:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html#temporal-functions
Now() 函数是 not deterministic,意思是对于每条记录都会重新计算。
Best,
Hailong Wang
在 2020-10-30 10:17:27,"zjfpla...@hotmail.com" 写道:
Hi,
目前JDBC sink 不支持分库分表,只能自己实现一个 Sink。具体实现的话,即使 insert Statement 需要在
writeRecord 阶段根据你的数据的 key 进行生成。
其中还需要将 key 和 statement 的对应关系缓存起来。
之前我内部的版本也支持了这个需求,因为后来在 DB 层面支持分库分表,所以在升级版本时候去掉了。(个人觉得这个是不是 DB 层面应该支持的?)
Best,
Hailong Wang
在 2020-10-30 15:10:33,"张健" 写道:
>hi,
>
>
Hi,
应该是长时间没有传输数据导致 Connection invalid,具体修复 issue
可见:https://issues.apache.org/jira/browse/FLINK-16681
看这个 Fix 应该是在1.12 和 即将发布的 1.11.3 上。
Best,
Hailong Wang
在 2020-11-02 13:20:54,"史 正超" 写道:
>是这样的,因为重试的时候
Hi Natasha,
没看到你上传的附件图呢,重新贴下不?
Best,
Hailong Wang
在 2020-10-29 16:52:00,"Natasha" <13631230...@163.com> 写道:
hi,社区~
我想把Flink连接Kafka封装成通用的方法,但是在使用java时,类型转换上遇到了问题(附件图),这个问题网上搜索到的资料很少,刚入门不久所以也不是很明白其中的原理,请各位同行指点我一下,不胜感激
谢谢!
Hi,
这个应该是下游算子有压力,可以根据 Inpool 指标查看哪个算子有瓶颈,然后对应的进行处理。
Best,
Hailong Wang
在 2020-10-27 18:57:55,"1548069580" <1548069...@qq.com> 写道
>各位好:
>最近遇到一个问题,上游有反压的情况下任务运行一段时间后出现上下游数据均停滞的情况,通过jstack命令发现,source算子阻塞了,同时观察到下游也在等待数据。堆栈如下:
>"Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5
Hi s_hongliang,
1、如果用 DataStream API 的话,可以需要使用 State 对需要被关联的表进行存储,并且设置 TTL。
2、如果使用 SQL 的话:
2.1、可以将需要被关联的数据存入Hbase 或者 Mysql,然后保证只有当天的数据,在 SQL 中使用 Temporal Table[1] 关联。
2.2、使用 temporal-table-function[2] ,设置StateRetentionTime同时过滤掉关联上昨天的数据。
[1]
Hi BenChen,
1. 可以保证需要 watermark 算子之前的算子和前面的算子不是 Forward 。
2. 如果是自己实现的 Connector 的话,可能定时检测调用 SourceFunction#markAsTemporarilyIdle 来标记为
idle,我看目前 Kafka 是刚启动时候进行检测。
Best,
Hailong Wang
在 2020-10-28 17:54:22,"BenChen" 写道:
>Hi
Hi marble,
看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend
加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。
至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger
缺延迟了。这两者的行为不一样,其实没有什么可比的。
对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。
1 - 100 of 133 matches
Mail list logo