Hi,
首先统一一下表述,当前只有1小时的滚动窗口,不存在20分钟的窗口,trigger只是输出结果,trigger的间隔和窗口大小无关。
按目前的设计,在11-12点的窗口里,输入x条类型A的数据,agg都记为1条,但1小时窗口会触发3次20s的trigger,
也就是会最多输出3条(A类型,12点)的数据(同一个1小时窗口,WindowEnd都是12点)。
这3条数据进入MapState后,写下了((A类型,12点),1)的记录并都注册了12点+1的timer,
那么timer在12点的时候会输出的结果就是(12点,1)。
你好
明白了,感谢 , 我文档没看清楚哈
在 2020-07-21 11:44:23,"Xingbo Huang" 写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
例如:
mysql表:
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,
`time` datetime NOT NULL,
`status` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `status` (
`id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,
PRIMARY
请问怎么反编译checkpoint文件啊,我想知道state写到checkpoint文件没有
_default_
OPERATOR_STATE_DISTRIBUTION_MODE SPLIT_DISTRIBUTE
VALUE_SERIALIZER
Gorg.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfigzS酿
脂?sr -org.apache.flink.runtime.state.JavaSerializerFSX韦4
? xr
好的,
输入:
心功能不全和心律失常用药,1,时间戳
心功能不全和心律失常用药,1,时间戳
抗利尿剂,1,时间戳
血管收缩剂,1,时间戳
血管紧张素II受体拮抗剂,1,时间戳
这里的时间戳就是eventtime了
比如前三条是在一个20秒窗口中,所以应该分为两个窗口:
心功能不全和心律失常用药和抗利尿剂,但是我是计数药物的种类的
所以不管窗口有多少元素我还是置为1,所以输出的接口就是窗口之和,即为2
接下来20秒都多了2个窗口而且和前面的医药种类不一样,所以在原来基础上再加2
输出4
即输出:
2020-7-20 19:00:00,2
2020-7-20 19:00:20,4
Hi,
你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)就行,如果你用了的话,就需要配置off-heap
memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
??
??Flink
??kafka??checkpointEXACTLY_ONCE
??
Producer attempted an operation with an old epoch.Either there is a newer
producer with the same transactionalId, or the producer's transaction has been
expired by the broker
Hi,
有点不太理解,能举个例子说明一下result那部分你预期的输入(几条sample 数据就行)和期望的输出结果吗
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月21日周二 上午11:29写道:
> hi,
>
> CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
>
>
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
> 签名由网易邮箱大师定制
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import
默认创建的是Flink表,Hive端不可见。
你想创建Hive表的话,用Hive dialect。
Best,
Jingsong
On Tue, Jul 21, 2020 at 11:31 AM felixzh wrote:
> 参照文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
> 通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
>
参照文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive
通过flink创建表:CREATE TABLE Orders (product STRING, amount INT)
在beeline端可见表,但是desc看不到字段,select * from orders也不可用
hi,
CountAgg是对一个窗口进行聚合,而一个窗口中的元素都是根据医药类别category分区而来的,都是一样的,所以我做累加就直接置为1了,你的意思是让我在CuontAgg上做累加吗
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月21日 11:10,shizk233 写道:
Hi,
我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
??
??pyflinkdemo??
----
??:
Hi,
我猜是因为设的1小时滚动窗口,WindowFunction里拿到的WindowEnd就是1小时的END,
而acc其实没有变化,也就是每隔20s触发拿到的结果是一样的,在MapState里也会忽略重复值。
你可以让acc做个累加,然后结果输出里把acc的值带上看看。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午8:44写道:
>
> 大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
> 罗显宴
> 邮箱:15927482...@163.com
>
>
Hi Z-Z
这种情况比较奇怪的。你这个是稳定复现的吗?能否分享一个稳定复现的作业代码,以及相关步骤呢?我尝试本地复现一下
Best,
Congxian
Z-Z 于2020年7月20日周一 下午4:17写道:
> 通过 cli 命令是 在jobmanager目录 执行 bin/flink run -d -p 1 -s {savepointuri}
> /data/test.jar 这种会报莫名其妙的错误,如之前的邮件
> 通过webui就是在http://jobmanager:8081; submit new
>
Hi
ListState 中的 value 是一个整体,所以一次性会取回来,现在 RocksDBMapStat 中有一些操作
(iterator/entries/values 等)是使用 next 方法加载的。
Best,
Congxian
蒋佳成(Jiacheng Jiang) <920334...@qq.com> 于2020年7月21日周二 上午9:28写道:
> 大家好:
>
> 我发现RocksDBListState#get方法是一次性把数据全加载到内存中,如果list太大这样会不会造成内存溢出。可不可以在next方法中才加载数据?
可以写已有的表,相关的配置 [1] 需要添加到表的property当中。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing
On Mon, Jul 20, 2020 at 5:14 PM Dream-底限 wrote:
> hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
>
Hi??Sinkinsertinsert
??
??AppendStreamTableSink requires that Table has only insert changes.??
chengyanan1...@foxmail.com
??
?? 2020-07-20 16:23
user-zh
?? ?? pyflink1.11.0window
HI ??
Hi,??jar??1??45jar
----
??:
??RocksDBListState#get??list??next??
Hi,
> 在 2020年7月20日,20:15,酷酷的浑蛋 写道:
>
> 1. flink1.11解压后,启动会报:
> java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
第一步就报错,应该是你本地环境问题,后面加的包应该都是不需要的,你本地用的JDK版本是多少呀?
祝好
Leonard Xu
大佬,不好意思,可能图片看不到,我把代码发一次,刚学习flink半个月,好多不懂,希望大佬莫嫌烦
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 20:38,罗显宴<15927482...@163.com> 写道:
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
看看异常信息, 是不是你的insert mode没配置对。
BTW, 你粘贴的文本带有很多"", 有点影响可读性。
Best,
Shuiqiang
奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月20日周一 下午4:23写道:
> HI :
> 我现在有一个新的问题,我在此基础上加了一个关联,再写入kafka时报错,如下
> Traceback (most recent call last):
> File
> "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
不好意思,刚才发的快,没来得及解释,
这里aggregate算子主要做了一个预聚合,把窗口的个数置为一,然后用windowResult输出结果,然后对窗口分区,最后用mapState处理递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
我运行的时候,他直接按1小时窗口输出了,并没有按20秒连续输出递增
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 14:09,罗显宴<15927482...@163.com> 写道:
不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,
1. flink1.11解压后,启动会报:
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
然后将jersey-client-1.9.jar、jersey-core-1.9.jar复制到lib下
2. 再次启动,报错:
Caused by: java.lang.ClassNotFoundException: javax.ws.rs.ext.MessageBodyReader
然后将javax.ws.rs-api-2.1.1.jar复制到lib下
3. 再次启动,报错:
Caused
好的,谢谢大佬,我用这个试试
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月20日 15:11,shizk233 写道:
Hi,
从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:
>
>
>
Hi,
看起来像是依赖冲突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查
祝好,
Leonard Xu
> 在 2020年7月20日,15:36,酷酷的浑蛋 写道:
>
>
>
> Flink1.11启动时报错:
> java.lang.LinkageError: ClassCastException: attempting to
>
flinkflink-1.8.1
org.apache.flink.runtime.executiongraph.ExecutionGraphException: Trying to
eagerly schedule a task whose inputs are not ready (result type:
PIPELINED_BOUNDED, partition consumable: false, producer state: SCHEDULED,
producer slot: null).at
忽略
sjlsumait...@163.com
是的。
但是不管怎么滚动,最终都是checkpoint完成后文件才可见
On Mon, Jul 20, 2020 at 7:10 PM Dream-底限 wrote:
> hi、
> 对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
>
> 》可以,默认下 128MB 滚动,Checkpoint 滚动
>
> Jingsong Li 于2020年7月20日周一 下午6:12写道:
>
> > Hi Dream,
> >
> > > 1.一定要在flink内部先建立hive表吗?
> >
hi、
对于下面这两个的滚动方式,是选优先到达的吗,就是1min的checkpoint和128mb的file size,不管哪个先到都会滚动生成新的文件
》可以,默认下 128MB 滚动,Checkpoint 滚动
Jingsong Li 于2020年7月20日周一 下午6:12写道:
> Hi Dream,
>
> > 1.一定要在flink内部先建立hive表吗?
>
> 不用,哪边建无所谓
>
> > 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
> 可以,默认下 128MB 滚动,Checkpoint 滚动。
>
> Best,
这flink1.11啥情况啊,一启动就报
java.lang.LinkageError: ClassCastException: attempting to
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
to
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
JM checkpoint ??
18:08:07.615 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO
Hi Dream,
> 1.一定要在flink内部先建立hive表吗?
不用,哪边建无所谓
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
可以,默认下 128MB 滚动,Checkpoint 滚动。
Best,
Jingsong
On Mon, Jul 20, 2020 at 5:15 PM Dream-底限 wrote:
> hi
> 好的,想问一下stream写hive表的时候:
> 1、一定要在flink内部先建立hive表吗?
> 2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
>
Hi Dream,
可以详述下你的测试场景吗?
Best,
Jingsong
On Mon, Jul 20, 2020 at 5:40 PM Dream-底限 wrote:
> hi、
> 请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
>
> 李佳宸 于2020年7月16日周四 下午10:39写道:
>
> > 好的,谢谢~~~
> >
> > JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
> >
> > > hi
> > >
hi、
请问这个问题最后怎么解决了,数据能滚动写入hive了嘛,我这面开启了checkpoint之后hive也是没数据
李佳宸 于2020年7月16日周四 下午10:39写道:
> 好的,谢谢~~~
>
> JasonLee <17610775...@163.com> 于2020年7月16日周四 下午8:22写道:
>
> > hi
> > 需要开启checkpoint
> >
> >
> > | |
> > JasonLee
> > |
> > |
> > 邮箱:17610775...@163.com
> > |
> >
> > Signature is customized by
hi
好的,想问一下stream写hive表的时候:
1、一定要在flink内部先建立hive表吗?
2、如果直接写hive内(hue建表)已经建好的hive表可以吗,文件会有滚动策略吗
Rui Li 于2020年7月20日周一 下午4:44写道:
> tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
>
> On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 wrote:
>
> > hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
> >
> > 异常:
> > The
tableEnv.executeSql就已经提交作业了,不需要再执行execute了哈
On Mon, Jul 20, 2020 at 4:29 PM Dream-底限 wrote:
> hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
>
> 异常:
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error:
hi,我这面请一个一个kafka到hive的程序,但程序无法运行,请问什么原因:
异常:
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: No operators defined in streaming topology. Cannot
generate StreamGraph.
at
Hi,
Flink
metrics里有一项是task相关的指标currentWatermark,从中可以知道subtask_index,task_name,watermark三项信息,应该能帮助排查watermark的推进情况。
Best,
shizk233
snack white 于2020年7月20日周一 下午3:51写道:
> HI:
> flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition
> 均有数据, flink job statue backend 为 memory
HI ??
??kafka
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147,
in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py",
cli ?? ??jobmanager bin/flink run -d -p 1 -s {savepointuri}
/data/test.jar
webui??http://jobmanager:8081; submit new
job??jar??savepoint path
??idea??local??
----
??:
"user-zh"
HI:
flink job 跑一段时间 watermark 不推进,任务没挂,source 是 kafka ,kafka 各个partition
均有数据, flink job statue backend 为 memory 。有debug 的姿势推荐吗? 看过 CPU GC 等指标,看不出来有异常。
Best regards!
white
Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
to
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at
看不到图片信息,换一个图床工具上传图片吧
Luan Cooper 于2020年7月17日周五 下午4:11写道:
> 附一个 Job Graph 信息,在 Cal 处挂了
> [image: image.png]
>
> On Fri, Jul 17, 2020 at 4:01 PM Luan Cooper wrote:
>
>> 实际有 20 左右个字段,用到的 UDF 有 COALESCE / CAST / JSON_PATH / TIMESTAMP 类
>> *是指 UDF 返回了 NULL 导致的吗?*
>>
>>
>> On Fri, Jul 17, 2020 at
Hi,
从你举的这个例子考虑,仍然可以使用ContinusEventTimeTrigger来持续触发结果更新的,只不过整个窗口可以根据结束条件考虑别的,比如Global窗口。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一 下午2:09写道:
>
>
> 不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
> | |
> 罗显宴
> |
> |
> 邮箱:15927482...@163.com
> |
>
Hi
这个调试可以在 IDEA 进行的。
另外你说的通过 web ui 提交没有问题。请问下,是同一个 savepoint 通过 flink run 提交有问题,通过 web ui
提交没有问题吗?如果是的,能否分享下你的操作过程和命令呢?
Best,
Congxian
Z-Z 于2020年7月20日周一 上午11:33写道:
> 这是taskmanager新报的一个错,还是跟之前一样,用cli提交报错,用webui提交就没问题:
> 2020-07-20 03:29:25,959 WARN
>
不是,是连续累计,比如我在某个医药网站上爬取有关药物,每小时统计爬取到的新增药物种类,然后一直这样进行下去,然后这个网站爬完了,可以换另一个网站,
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由网易邮箱大师定制
在2020年7月20日 11:47,shizk233 写道:
Hi,
累计是仅在一天之内累计吗,这样的话可以开个一天的Tumbling
Window,然后使用ContinusEventTimeTrigger每小时触发一下输出结果。
Best,
shizk233
罗显宴 <15927482...@163.com> 于2020年7月20日周一
50 matches
Mail list logo