hi,这种情况似乎像是反压造成的,数据流反压会导致算子来不及处理checkpoint事件,watermark消息也会因为反压无法发送到下游算子。
建议观察下反压的情况[1],如果是这样的话,再针对反压源头进行优化处理。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/back_pressure.html
restart 于2020年10月9日周五 上午11:48写道:
> 大家好,请假一个问题:
>场景是这样的,flink消费kafka,清洗后按分、时、天的维度(其中小
flink sql似乎不能设置rebalance,在Data Stream API可以设。
一种思路是拆分sql逻辑,在Data Stream API上创建kafka source后再reblance成一张新表,再做后续dml sql。
另一种思路就是kafka topic增加一下分区
Asahi Lee <978466...@qq.com> 于2020年9月28日周一 下午1:56写道:
> 你好! 使用flink
> SQL,如何设置rebalance呢?-- 原始邮件 --
> 发件人: "zilong&n
应该是没有的,可以自己改造下kafka source来实现。
话说flink自动反压流控不能满足场景需要吗
me 于2020年9月27日周日 下午5:45写道:
> flink版本1.11
> flink连接kafka使用的是 flink addSource特性
>
>
> 原始邮件
> 发件人: me
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 17:22
> 主题: 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有没有限流的参数?
>
>
> 当kafka有大量数据积压并且flink冷启动,flink端读取kafka有
如果是Data Stream API的话,可以考虑在目标算子上使用自定义metrics来展示数据延时情况
郭士榕 于2020年9月26日周六 下午9:15写道:
> Hi,All
>
>
> 想问下大家如果要展示Flink任务的当前延时情况,有什么比较好的方法吗?用LatencyMarker是否可以,用API/JMX层面来获取的histogram能否汇总成一个数字?
需要搭配事务性存储机制来使用,能够保证预提交成功的数据能最终被commit成功。
详情可以参考孙金城老师关于这一部分的讲解和代码实现[1]
[1]https://www.bilibili.com/video/BV1yk4y1z7Lr?p=33
高亮 于2020年9月25日周五 上午11:14写道:
> 各位大佬,请教一下二阶段提交的问题,preCommit预提交失败回滚已经很清楚了,就是在commit阶段提交如果失败会怎么,比较迷惑。
>
>
>
> 我自己测试了一下,发现只要是commit失败会造成数据丢失,但是看了下方法注释,说是失败了后会重启flink恢复到最近的state,继
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。
按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。
至于不加上transient是否可能产生其他影响,就不太清楚了。
范超 于2020年9月10日周四 上午9:35写道:
> Transient 都不参与序列化了,怎么可能从checkopont里恢复?
>
> -邮件原件-
> 发件人: Yun Tang [mailto:myas...@live.com]
> 发送时间: 2020年9月7日
Hi Xiao,
我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。
我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。
感觉这可能是一个bug?
xiao cai 于2020年9月1日周二 下午4:57写道:
> Hi:
> 可以试试在flink-conf.yaml中添加:
> metrics.reporter.promgateway.deleteOnShutdown: true
>
>
> Best,
> Xiao
> 原始邮件
> 发件人: br
你说的资源共享是指slot sharing吗,同一个job的task默认都在一个slot group中,都是可以共享slot的。
但同一个task的多个并发实例不能共享slot,多个实例需要分配在不同的slot中。
另外,关于“slot中存在资源共享时,1个slot会有多个thread”。如果是指数据处理的thread,那我理解应该是1个slot只有1个thread。
liangji 于2020年9月1日周二 下午4:45写道:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flin
失败,而标记这个checkpoint的快照整体失败。
> > 从而重启消费会从source的1开始重新消费
> >
> >
> > -邮件原件-
> > 发件人: Benchao Li [mailto:libenc...@apache.org]
> > 发送时间: 2020年8月27日 星期四 10:06
> > 收件人: user-zh
> > 主题: Re: 关于sink失败 不消费kafka消息的处理
> >
> > Hi Eleanore,
不太清楚你定时读mysql是需要做什么,如果是维表join的话考虑temporal table join[1],通过设置ttl时间和数量来更新缓存[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#how-to-create-a-jdb
Hi Eleanore,这个问题我可以提供一点理解作为参考
1.chk与at least once
checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
2. sink2PC
在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚
按我的理解,参考aggregate(AggregateFunction aggFunction,
ProcessWindowFunction windowFunction)方法,
窗口中的状态数据是存在某个聚合函数里的,processWindowFunction只是处理窗口的结果,需要通过context获取对应的窗口state来做清理。
x <35907...@qq.com> 于2020年8月25日周二 下午6:25写道:
>
> 按天开窗,按分钟触发,重写了ProcessWindowFunction的clear方法,希望在窗口结束的时候,将状态清理,可实际执行中,跨天状态并未清理,代买
Song
>
>
>
> On Mon, Aug 24, 2020 at 5:26 PM shizk233
> wrote:
>
> > Hi all,
> >
> > 请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。
> > 在*默认设置*下,能观察到本地flink使用的G1,但on
> yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗?
> >
> > 我搜索了一些相关资料,但仍然没有
Hi all,
请教一下,flink自从1.10开始默认GC就是G1了,在taskmanager.sh脚本中也能看到。
在*默认设置*下,能观察到本地flink使用的G1,但on yarn运行时却发现使用的是PS,想请教下这是为什么?是yarn会对应用有一些默认设置吗?
我搜索了一些相关资料,但仍然没有搞清楚这是怎么回事,希望有了解的朋友帮忙解答下。感谢!
备注:我可以通过在flink-conf.yaml中设置env.java.opts: -XX:+UseG1GC来使flink on yarn也使用G1。
ob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
>
>
> 祝好
> 唐云
>
> ____
> From: shizk233
> Sent: Friday, August 21, 2020 10:51
> To: user-zh@flink.apache.org
> Subject: Re: state序列化问题
>
>
ListState的格式应该是
> ListState, 而不是
> ListState>,后者表示有一个list,list中的每一个元素均是一个list
>
> ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。
>
> 祝好
> 唐云
> ____
> From: shizk233
> Sent: Thursday, August 20, 2020 18:00
> To: user-zh@flink.apa
理解了!感谢🙏!
Benchao Li 于2020年8月20日周四 下午6:00写道:
> 不同的key应该用的是同一个state的实例,但是state下面会处理不同的key对应的state,也就是key对于用户来说是透明的。
> 比如你用一个MapState,那就是约等于每个key都有一个Map实例,不同key之间是独立的。
>
> shizk233 于2020年8月20日周四 下午5:03写道:
>
> > 谢谢大佬解答。
> > 想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
> &g
Hi all,
请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
会对类型信息提取产生不良影响吗?
按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
但是都可以作为List>来声明。
请求野生的大佬支援一下!
"user-zh"
> <
> libenc...@apache.org>;
> 发送时间: 2020年8月20日(星期四) 下午4:39
> 收件人: "user-zh"
> 主题: Re: KeyedCoProcessFunction中的状态数据是否存在资源竞争的可能
>
>
>
> Hi,
>
> 问题1&2 都不存在多线程的问题。Flink底
谢谢大佬解答。
想再问一下,在并发1的情况下,一个算子里的MapState应该只有一个实例,
那么数据流上的不同key(不是map里的hash key)是怎么保证只获取到对应的那部分map state数据呢?
按我的理解,按key分流后,每个子流应该是只有自己的state,但从算子实例考虑,好像只有1个state实例存在。
Benchao Li 于2020年8月20日周四 下午4:40写道:
> Hi,
>
> 问题1&2 都不存在多线程的问题。Flink底层来保证这些方法都是在同一个线程串行执行的。
>
> shizk233 于20
Hi all,
请教一下,KeyedCoProcessFunction比较特殊,有两个输入,对应两个ProcessElement方法。
问题1:
如果在这两个Process方法中都对同一个MapState进行修改,是否会存在资源竞争的关系?
还是这两个方法是顺序执行的?
问题2:
虽然有不同的key,但函数只有一个实例,其中的MapState应该也是一个实例,那么不同key下的
Process过程是并发执行的还是顺序执行的,会竞争MapState资源吗?
Congxian Qiu 于2020年8月19日周三 下午1:58写道:
> Hi
> 从栈看应该是 deserialize 的时候出错了,另外 kryo 可以,Pojo 不行,能否检查下,是否满足 POJO
> 的一些要求[1]呢?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/schema_evolution.html#pojo-types
> Best,
> Congxian
>
>
&
Hi all,
请教一下反序列化的问题,我有一个KeyedCoProcessFunction,输入是log流和rule流。
数据流如下:
logSource
.connect(ruleSource)
.keyby(...)
.process(My KeyedCoProcessFunction<>)
.keyby(...)
.print()
其中CoProcess函数中有两个MapState分别做log缓存和rule缓存。
结构为Map> logState,Map> ruleState.
T在实例化函数时确定,为MyLog类型。
运行时遇到了如下错误,看样子似乎是在下游算子反序列化数据时
考虑修改一下json解析的逻辑来处理异常数据?
赵一旦 于2020年8月18日周二 上午11:59写道:
> 有没有小伙伴生产中有类似问题呢,都怎么解决的呢?
> 我指通过FlinkSQL方式,结合datastream api方式倒是好解决,只需要把容易出错的地方换成datastream
> api,然后捕获所有异常即可。
>
> 赵一旦 于2020年8月17日周一 下午7:15写道:
>
> > kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢?
> >
> > 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flin
有没有可能把维表数据也作为数据流从kafka输入呢
Jim Chen 于2020年8月17日周一 下午4:36写道:
> 大家好:
> 我们现在在用flink sql在做实时数仓,架构大概是kafka关联hbase维表,然后写入clickhouse。hbase维表是频繁变化的
> 现在遇到的几个比较棘手的问题:
> 1、自己在实现AsyncTableFunction做异步io的时候,发现性能还是不够。后来就加入本地缓存,但是缓存一致性出现问题,不知道该如何解决
> 2、写入hbase的时候,是批量写的,无法保证有序,维表频繁变化的话,顺序不对,会造成结果有问题
> 3、hbas
ctx.timestamp()其实就是获取的StreamRecord的时间戳,也就是事件被提取出来的时间戳。
这个方法一般需要使用event time,并且在数据流上assign过timestamp和watermark。
ゞ野蠻遊戲χ 于2020年8月16日周日 下午7:57写道:
> 大家好
>
>
> 当我在KeyedProcessFunction的processElement方法中获取processingTime,就像这样ctx.timestamp(),返回为null,我改如何在processElement中获取processingTime?
>
>
> 谢谢!
flink框架里用的slf4j吧,log4j2只是一种具体实现,应该是可以直接替换掉的。
就是把flink发行包下log4j2相关的jar替换成log4j的jar,当然,相应的配置文件也要改成log4j支持的配置。
caozhen 于2020年8月13日周四 下午3:39写道:
> flink1.11好像是用的log4j2,我的mainjar用到了log4j, 两者类有冲突,导致JM、TM日志为空。
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file
`;
> 一种是通过类似于UDAF的方式来实现的,继承的是`AggregateFunction`
> 他们都在`org.apache.flink.table.planner.functions.aggfunctions`
> 包里面(flink-table-planner-blink模块)
>
> shizk233 于2020年8月12日周三 上午10:39写道:
>
> > hi all,
> >
> > 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink tab
hi all,
请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table
planner模块下的functions package里找到了一部分,并且是基于Expresstion的。
问题来源:我试图在flink sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFuncti
Hi,这个日志全是 有点头大。。。
我刚想到,除了task重启外,还有一种情况是task没有调度成功。
你能通过flink web ui观察到task的状态吗,都是RUNNING吗?
如果一直是schedule,那应该是缺少对应的资源进行调度,需要检查下task manager提供的slot资源以及任务所需的资源。
如果是running、failed、schedule的不断切换,那需要检查task manager的日志,应该有warn。
Bruce 于2020年8月10日周一 下午6:12写道:
> 下面是附件的内容,请问是因为什么导致重启呢?
>
>
> 2阶段提交demo
hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。
不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。
Bruce 于2020年8月10日周一 下午5:01写道:
> 您好,这里有个问题反馈下!
>
> 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> 没有抛任何异常但是checkpoint失败:
> job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but S
这个应该根据业务特性来决定吧。
如果是一些大型的streaming任务,需要长期稳定运行并且有良好的隔离性,则可以考虑perjob模式。
如果需要经常性提交一些小任务(常见于batch任务)或者说有一批相关联的任务,彼此隔离性要求也不高的,可以考虑session模式。
感觉说到底还是业务隔离性与资源的权衡。
Dream-底限 于2020年8月10日周一 下午4:21写道:
> hi、
> FlinkOnYarn集群部署是推荐使用yarn-session模式所有任务共用一个,还是推荐使用preJob模式每个任务起一个小集群
>
“By default, Flink allows subtasks to share slots even if they are subtasks
of different tasks, so long as they are from the same job.”
参考官网描述[1],应该只有相同job下的task可以共享slot。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html
wangl...@geekplus.com 于2020
flink官网有一个kerberos相关的说明文档[1],不知是否能帮助到你。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-kerberos.html
zjfpla...@hotmail.com 于2020年8月10日周一 下午3:15写道:
> Flink on yarn模式
>
>
>
> zjfpla...@hotmail.com
>
> 发件人: zjfpla...@hotmail.com
> 发送时间: 2020-08-10 15:09
> 收件人: user-z
具体原因不太清楚,但建议使用context.timeService().currentEventTime()和currentProcessingTime()来获取当前的时间。
排查方法的话,不知道你有没有做算子的单元测试,如果还没有的话可以通过flink test util[1][2]做单元测试来debug排查,
可以比较明确的观察到timeService上的Timer状态。
[1]
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
[2]
https://ci.a
看到背压。 我不断产生数据100w以上了。
>
>
>
>
>
> -- 原始邮件 ------
> 发件人: shizk233 发送时间: 2020年8月3日 23:03
> 收件人: user-zh@flink.apache.org 主题: 回复:flink-1.11 模拟背压
>
>
>
> source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
>
> kcz <573693...@qq.co
source没有看到数据消费具体是指什么,web ui上task的numOfRecordsIn吗
kcz <573693...@qq.com> 于2020年8月3日周一 下午7:29写道:
> 我想看下背压的指标数据,我往kafka发送了100w数据,但是source我也没有看到数据被消费,是我哪里模拟错了吗
> public static void main(String[] args) throws Exception{
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecution
在yarn上各节点起作用的环境变量应该是用-yD设置
Zhou Zach 于2020年8月3日周一 下午6:35写道:
> Hi all,
>
> 通过如下方式设置HBASE_CONF_PATH变量,提交到yarn时,发现HBASE_CONF_PATH没有生效,
>
>
> /opt/flink-1.11.1/bin/flink run-application -t yarn-application \
> -DHBASE_CONF_PATH='/etc/hbase/conf' \
>
>
> 请问flink提交job时,怎样设置环境变量?
如JasonLee所说,你可以在FlinkKafkaConsumerBase的notifyCheckpointComplete方法中看到提交offset的逻辑。
值得注意的是,此处的chk完成指的是整个链路上的chk完成,而不是kafka source的chk完成。
JasonLee <17610775...@163.com> 于2020年7月30日周四 下午9:59写道:
> hi
> 提交offset到Kafka是在ck成功之后 如果没有开启ck的话 需要设置自动提交提交offset
>
>
> | |
> JasonLee
> |
> |
> 邮箱:17610775...@163
可以检查下在patition3上有没有成功提交过offsets。负值可能是没有提交过情况下的默认值(我猜这是个不变值)。
bradyMk 于2020年7月29日周三 下午6:36写道:
> flink1.9.1
>
> 在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。
> 但我这里为什么是负值呢?
> <
> http://apache-flink.147419.n
似乎楼主一开始说的checkpoint成功是指source
算子的checkpoint成功?但notifyCheckpointComplete函数要求的是整个链路的chk成功。
这个时候offset为100的消息必然已经被sink算子处理完成了,因为触发chk的屏障消息必然在offset100的消息之后到达sink算子。
hk__lrzy 于2020年7月29日周三 下午5:53写道:
> 你是说emit之后的offset commit么?可以看下
> `Kafka09Fetcher`的runFetchLoop方法
>
>
> 在2020年07月29日 15:09,shuwen zho
恭喜!
罗显宴 <15927482...@163.com> 于2020年7月23日周四 上午1:14写道:
> 感谢shizk233大佬,我这个问题终于得到解决,我主要是通过全窗口加mapstate实现的
> best
> shizk233
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月21日 15:04,罗显宴 写道:
> hi,我想到解决办法了,可以用全局
新增一条((A类型,13点),1)的记录,在13点得到最终结果(13点,2)。
而这个结果和我理解的你的需求不太一样,我理解的情况应该是12点输出(12点,1),13点输出(13点,1),因为目前只有A类型的数据。
期望的输出应该是无限时间上的去重类型统计,每隔1小时输出(几点,当前所有的类型总数)。
我觉得目前的设计可能和描述的需求不太一致?你看看是不是这么回事儿
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:53写道:
> 好的,
> 输入:
> 心功能不全和心律失常用药,
Hi,
有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:1
Hi,
我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
你可以让acc做个累加,然后结果输出里把acc的值带上看看。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> 罗显宴
> 邮箱:1
Hi,
Flink
metrics里有一项是task相关的指标currentWatermark,从中可以知道subtask_index,task_name,watermark三项信息,应该能帮助排查watermark的推进情况。
Best,
shizk233
snack white 于2020年7月20日周一 下午3:51写道:
> HI:
> flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition
> 均有数据, flink job statue backend 为
Hi,
从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:159274
Hi,
累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 上午1:18写道:
>
> 大家好,怎么实现一个滚动窗口内的连续递增元素个数,比如每小时累计用户登录数,比如说凌晨1点登录了500人,到了两点就累计到1200,到3点累计到这样实现一个递增曲线,在网上看到云邪大佬写的sql但是不会写datastream
> api,希望
Hi drewfranklin,
flink使用event time,然后类似下面这样可以吗?
Pattern.begin("a").next("b").within(Time.minutes(1));
Best,
shizk233
drewfranklin 于2020年7月14日周二 上午11:05写道:
> Hello all.
> 想请教下各位。
>
> 我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。
>
> 但是我定义pattern 后发现,我的这个
Hi nicygan,
unable to create new native thread指的是无法创建checkpoint线程,并不是内存占用过大。
这种情况一般有3种可能的原因:
1.flink应用开启太多线程
2.机器上句柄设置太小
3.机器上的其他应用开启太多线程
建议排查一下机器上的ulimit设置(文件句柄会影响应用能开启的线程数),和flink metrics里监控到的线程数变化。
Best,
shizk233
nicygan 于2020年7月14日周二 上午10:31写道:
> dear all:
>
> 我有一个消费kafka数据写到p
Hi Jiazhi,
1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL
Best,
shizk233
ゞ野蠻遊戲χ 于2020年7月7日周二 下午10:27写道:
> 大家好!
>
> 想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
> 1、在使用Tumbling窗口的时候
抱歉,我似乎回错邮件了,应该使用回复全部?
Congxian Qiu 于2020年7月6日周一 下午7:22写道:
> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
> Best,
> Congxian
>
>
> Z-Z 于2020年7月
Hi Z-Z,
如果你想查看的是程序中的state内容,建议触发一次savepoint并搭配state processor api来查询。
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html
Best,
shizk233
Congxian Qiu 于2020年7月6日周一 下午7:22写道:
> Hi
> 想 debug checkpoint 文件的话,可以参考下这个 UT[1]
>
> [
Hi Sun ZHu,
关于方法4,我记得kafka有时间轮功能,可以做到延迟消息的,可以了解一下。
Best,
shizk233
Sun.Zhu <17626017...@163.com> 于2020年7月4日周六 上午12:23写道:
> 感谢benchao和forideal的方案,
> 方法1.使用udf,查不到 sleep 等一下在查
> --这个可以尝试
> 方法2.在 join operator处数据等一会再去查
> —我们使用的是flink sql,不是streaming,所以该方案可能行不通
> 方法3.如果没有 jo
Hi air23,
sql似乎不支持相关的设置,可以通过env或配置文件设置所有蒜子的并行度。
你可以试试流转表,可以做到细粒度的控制。
Best,
shizk233
air23 于2020年7月2日周四 下午6:40写道:
> hi
> 就是我用
>flink sql 通过ddl读取和写入kafka怎么设置并行度呢?
>flink sql 通过ddl写入kafka怎么自定义分区呢?
>
>
> 这样才能保证提高消费能力。和保证数据有序。 但是好像没有发现再table模式 或者sql 语句上设置 或者做自定义分区。
>
>
>
>
>
>
Hi LakeShen,
感谢!就是这个!我这就去试一下
Thanks,
Xuhui Mao
LakeShen 于2020年6月30日周二 下午2:06写道:
> Hi shizk233,
>
> 可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681。
>
> 这个就是长时间没有数据,导致 connection 断开问题。
>
> Best,
> LakeShen
>
> shizk233 于2020年6月30日周二 下午1:34写道:
>
以参考这个jira
> https://issues.apache.org/jira/browse/FLINK-12494
> 1. Throw execption and let flink runtime handle it;
> 2. Handle it in OutputFormat;
>
>
> | |
> Zhonghan Tang
> |
> |
> 13122260...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> On 06/30/2020 11:53,shizk233 wrote
Hi All,
最近使用flink处理数据写入mysql sink,但由于业务场景在晚上没有数据流入,会触发mysql wait
timeout限制(默认的8小时)导致连接失效。
即使在mysql url中添加了autoReconnect=true参数,仍会产生相应的异常,具体信息见下。
版本信息:
flink 1.10.1
mysql server 5.6.47
mysql Connector/J 5.1.49
请问:
1.flink的jdbc connector是否可以采用连接池模型?如果只使用一个connection,是否可以添加某种心跳机制以保持active?
2.连接失效后是否
58 matches
Mail list logo