具体是什么样的 exception?Kafka 的重连和 Task 重启是不同的事情。前者取决于 Kafka 的配置和异常的类型,后者取决于 Flink
的重启策略。
Best,
Paul Lam
> 2020年9月11日 11:42,marble.zh...@coinflex.com.invalid
> 写道:
>
> 自己顶一下, 有人给些建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
自己顶一下, 有人给些建议吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好, flink第二个窗口无数据时,怎么填充第一个窗口数据;flink是以事件为驱动,这种需求能实现吗?通过状态可以保存上一个窗口数据吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi??flink 1.11.1??s3??
2020-09-11 11:06:14,680 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 0
checkpointing for checkpoint with id=1 (max part counter=2).
2020-09-11 11:06:14,685 WARN com.amazonaws.services.s3.internal.Mimetypes
1、timestamp也是测过的,结果是一样的,后面才改成varchar的 2、上面的例子后面就是贴测试结果
Mysql-cdc只是source,我的理解您pvuv_test_back应该用jdbc作为sink。
Best,
TonyChen
> 2020年9月10日 下午3:54,杨帅统 写道:
>
> pvuv_test_back
flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。
按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。
至于不加上transient是否可能产生其他影响,就不太清楚了。
范超 于2020年9月10日周四 上午9:35写道:
> Transient 都不参与序列化了,怎么可能从checkopont里恢复?
>
> -邮件原件-
> 发件人: Yun Tang [mailto:myas...@live.com]
> 发送时间:
Hi??flink1.11.1??s3S3plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.11.1.jar
2020-09-10 22:23:07
java.lang.UnsupportedOperationException: This s3 file system implementation
does not support recoverable writers.
at
Hi
> insert into sk
> values(localtimestamp,current_timestamp,localtimestamp,current_timestamp);
1. 你flink里声明的字段类型和数据库的类型不匹配,需要保持一致,数据库里是varchar,flink是timestamp,完整类型映射可以参考[1]
2. 你插入的两个字段(ocaltimestamp,current_timestamp)的值可以贴出来看看?
Best
Leonard
Flink1.10.0 的checkpoint越来越大,
但是,我到TM机器上看,flink-io-xxx 目录也没有那么大,是统计指标大问题吗?
307
> COMPLETED
> 30/30 20:55:40 20:55:54 14s 8.62 GB 0 B
> 306
> COMPLETED
> 30/30 20:50:40 20:50:55 15s 8.59 GB 0 B
> 305
> COMPLETED
> 30/30 20:45:40 20:45:54 13s 8.56 GB 0 B
> 304
> COMPLETED
> 30/30 20:40:40 20:40:55
flink_cdc
flink +debezium Benchao Li-2??
----
??: "silence"http://apache-flink.147419.n8.nabble.com/
CREATE TABLE product2 (
id INT,
prod_nm STRING,
primary key(id) NOT ENFORCED -- '??'
)
WITH (
'connector' = 'jdbc',
'url' =
'jdbc:mysql://10.12.5.37:3306/rs_report?useUnicode=truecharacterEncoding=UTF-8',
'table-name' = 'sink',
'driver' =
1. 没有找到rest endpoint的原因应该是你flink run提交任务的时候没有指定namespace
你用下面的命令应该就可以了
./bin/flink run -d -e kubernetes-session
-Dkubernetes.cluster-id=flink-cluster -Dkubernetes.namespace=flink
examples/streaming/WindowJoin.jar
2. 没有资源有可能是Flink的ResourceManager没有足够的权限向K8s申请Pod,确认你已经配置了正确的service
account[1]。
个人理解有几种实现方案
1、通过主键加LAST_VALUE()使用最新的记录进行计算
2、通过flink-cdc connector source
3、自己根据操作类型写计算逻辑
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如果只是聚合的结果,像sum这种函数,可以先减去原来的值,然后再加上更新后的值。但如果是count(distinct)呢?还是需要把具体的每个值都存起来把。
Benchao Li 于2020年9月10日周四 下午3:26写道:
> sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
> 当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
>
> last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
>
> lec ssmi
首先,我使用的flink版本是1.11,K8S版本是v1.17。
启动的集群的脚本命令是:
./bin/kubernetes-session.sh -Dkubernetes.cluster-id=flink-cluster
-Dkubernetes.jobmanager.service-account=flink
-Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2
-Dtaskmanager.numberOfTaskSlots=4 -Dkubernetes.namespace=flink
lib 目录下的包如上图 flink是本地模式 也尝试过重启 ./bin/start-cluster.sh 命令 但还是提示一样的错误信息
在 2020-09-09 21:16:58,"Leonard Xu" 写道:
>Hi
>
>这个错误是jar包没有正确地加载,看代码应该没啥问题,添加jar包后需要重启下集群,你测试的时候重启了吗?
>
>
>祝好
>Leonard
>
>> 在 2020年9月9日,16:48,杨帅统 写道:
>>
>> 公司希望将MySQLA库的数据实时同步到B库中,我想通过fink1.11的CDC功能不知道是否可行。
>>
sql 算子内部会自动处理这些状态。 这个状态只是聚合的中间结果,并不需要保留原始数据。
当然这个聚合的中间结果状态,也可以指定state retention time来清理一些过期的状态。
last_value只是一个聚合函数,没啥特殊的地方,而且只是按照处理时间获取最后一条数据的聚合函数。
lec ssmi 于2020年9月10日周四 下午2:35写道:
> 上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
> 感觉底层和 last_value() group by id是一样的。
>
> Benchao Li
Hi all,
最近在看Flink资源管理部分的源码,Flink 1.11.1 版本在本地IDE调试的时候发现
slotPool.requestNewAllocatedSlot 方法申请到 PhysicalSlot 的 ResourceProfile
信息比较奇怪(见图)。
请问这是因为本地IDE运行的原因,还是说缺少某些配置信息?
Thanks!
Hi,
退订需要发邮件到 user-zh-unsubscr...@flink.apache.org
可以参考 https://flink.apache.org/zh/community.html#section-1
程 婕 于2020年9月10日周四 下午1:56写道:
> 退订,谢谢
>
>
>
上述说的这种特性,应该也是要依赖于状态把。如果变化的间隔时间超过了状态的保存时长,还能生效吗?
感觉底层和 last_value() group by id是一样的。
Benchao Li 于2020年9月10日周四 上午10:34写道:
>
> 1.11中中新增了changelog的支持。目前内置有canal[1]和debezium[2]两个format可以读取binlog数据形成changelog。
> 如果还有自己的binlog格式,也可以自定义format来实现。
>
>
21 matches
Mail list logo