大家好,
在RocksDBStateBackend的pom中看到是使用了
frocksdbjni,看了下这个包是dataArtisans自己的。而RocksDBStateBackend是有提供Java
sdk的,叫RocksJava。RocksDBStateBackend为什么不直接用 RocksJava呢?
| |
Feifan Wang
|
|
zoltar9...@163.com
|
签名由网易邮箱大师定制
【现象】相同配置的作业(checkpoint interval :3分钟,作业逻辑:regular
join),flink1.9运行正常,flink1.12运行一段时间后,checkpoint制作耗时增大,最后checkpoint制作失败。
【分析】了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量,因此调整checkpoint
Hi,Jacob
官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
`
链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
Hi, 请问:
env.setParallelism(8);
source = select * from table1,
Table filterTable = source.filter(x-> x>10).limit(1);
try (CloseableIterator rows = filterTable.execute().collect()) {
while (rows.hasNext()) {
Row r = rows.next();
String a = r.getField(1).toString();
Hi, 请问:
env.setParallelism(8);
source = select * from table1,
Table filterTable = source.filter(x-> x>10);
try (CloseableIterator rows = filterTable.execute().collect()) {
while (rows.hasNext()) {
Row r = rows.next();
String a = r.getField(1).toString();
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
原SQL:
SELECT day,
你好,问题定位到了吗?
我也遇到了相同的问题,感觉和checkpoint interval有关
我有两个相同的作业(checkpoint interval
设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
tolerable failure threshold.
当我把checkpoint
看FlinkYarnSessionCli代码:
final Configuration configuration =
applyCommandLineOptionsToConfiguration(cmd);
final ClusterClientFactory
yarnClusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);