【疑问】RocksDBStateBackend为什么使用单独封装的frocksdbjni,而不用RocksDB官方提供的RocksJava

2021-03-23 文章 zoltar9264
大家好,
在RocksDBStateBackend的pom中看到是使用了 
frocksdbjni,看了下这个包是dataArtisans自己的。而RocksDBStateBackend是有提供Java 
sdk的,叫RocksJava。RocksDBStateBackend为什么不直接用 RocksJava呢?


| |
Feifan Wang
|
|
zoltar9...@163.com
|
签名由网易邮箱大师定制



相同的作业配置 ,Flink1.12 版本的作业checkpoint耗时增加以及制作失败,Flink1.9的作业运行正常

2021-03-23 文章 Haihang Jing
【现象】相同配置的作业(checkpoint interval :3分钟,作业逻辑:regular
join),flink1.9运行正常,flink1.12运行一段时间后,checkpoint制作耗时增大,最后checkpoint制作失败。

【分析】了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量,因此调整checkpoint
interval为10分钟进行对比测试,发现调整后(interval为10),flink1.12上运行的作业运行正常。
相关issue:https://issues.apache.org/jira/browse/FLINK-16404

【问题】1.想咨询下大家有遇到过相同的情况么?
2.flink1.12的作业checkpoint间隔对作业的影响具体有多大?官方有测试么?

checkpoint interval为3分钟的flink1.12作业运行5小时后,checkpoint制作失败,具体异常栈:

org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.

at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:96)

at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1924)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1897)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)

at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2038)

at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


Re: Flink 消费kafka ,写ORC文件

2021-03-23 文章 Robin Zhang
Hi,Jacob
  
官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
`
链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D

  
希望对你有帮助。

Best,
Robin


Jacob wrote
> 【现状如下】
> 
> Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
> 据了解,flink写orc的桶分配策略[1],有两种:
> 
> 一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]
> 
> test/realtime/
> └── 2021-03-23--07
> ├── part-0-0.orc
> ├── part-0-1.orc
> └── 2021-03-23--08
> ├── part-0-0.orc
> ├── part-0-1.orc
> 
> 一种是将所有部分文件放在一个目录下:
> 
> test/realtime/
> ├── part-0-0.orc
> ├── part-0-1.orc
> ├── part-0-2.orc
> ├── part-0-3.orc
> 
> 【问题】
> 
> 最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:
> 
> hive> show partitions table_demo;
> OK
> dt=161645580
> dt=161645760
> dt=161645940
> dt=161646121
> dt=161646301
> Time taken: 0.134 seconds, Fetched: 5 row(s)
> 
> 因此希望每个orc文件的所在目录名都是dt=`时间戳`的格式:
> 
> http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png; 
> 
> 用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。
> 
> 不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了
> 
> [1].https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
> 
> 
> 
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql 写hive并行度设置问题

2021-03-23 文章 ggc
Hi, 请问:

env.setParallelism(8);
source = select * from table1,  

Table filterTable = source.filter(x-> x>10).limit(1);

try (CloseableIterator rows = filterTable.execute().collect()) {
  while (rows.hasNext()) {
  Row r = rows.next();
  String a = r.getField(1).toString();
  String b = r.getField(2).toString();
  String c = r.getField(3).toString();

  // 往结果表写入数据
  stableEnv.executeSql("insert into table2 values('a', 'b','c') ");
  }
}

发现source 和 最后写入表的sink都是1个并行度,只有filter 使用了8个并行度,这是正常的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql 并行度问题

2021-03-23 文章 ggc
Hi, 请问:

env.setParallelism(8);
source = select * from table1,  

Table filterTable = source.filter(x-> x>10);

try (CloseableIterator rows = filterTable.execute().collect()) {
  while (rows.hasNext()) {
  Row r = rows.next();
  String a = r.getField(1).toString();
  String b = r.getField(2).toString();
  String c = r.getField(3).toString();

  // 往结果表写入数据
  stableEnv.executeSql("insert into table2 values('a', 'b','c') ");
  }
}

发现source 和 最后写入表的sink都是1个并行度,只有filter 使用了8个并行度,这是正常的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink sql count distonct 优化

2021-03-23 文章 guomuhua
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
 set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
原SQL:
SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,

对所需DISTINCT字段buy_id模1024自动打散后,SQL:
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day

还是flink会帮我自动改写SQL,我不用关心?

另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
 





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-23 文章 Haihang Jing
你好,问题定位到了吗?
我也遇到了相同的问题,感觉和checkpoint interval有关
我有两个相同的作业(checkpoint interval
设置的是3分钟),一个运行在flink1.9,一个运行在flink1.12,1.9的作业稳定运行,1.12的运行5小时就会checkpoint
制作失败,抛异常 org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint
tolerable failure threshold.
当我把checkpoint interval调大到10分钟后,1.12的作业也可以稳定运行,所以我怀疑和制作间隔有关。
看到过一个issuse,了解到flink1.10后对于checkpoint机制进行调整,接收端在barrier对齐时不会缓存单个barrier到达后的数据,意味着发送方必须在barrier对齐后等待credit
feedback来传输数据,因此发送方会产生一定的冷启动,影响到延迟和网络吞吐量。但是不确定是不是一定和这个相关,以及如何定位影响。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink-1.11.2版本,客户端如何设置dynamic properties

2021-03-23 文章 easonliu30624700
看FlinkYarnSessionCli代码:
final Configuration configuration =
applyCommandLineOptionsToConfiguration(cmd);
final ClusterClientFactory 
yarnClusterClientFactory =
clusterClientServiceLoader.getClusterClientFactory(configuration);
configuration.set(DeploymentOptions.TARGET,
YarnDeploymentTarget.SESSION.getName());

final YarnClusterDescriptor yarnClusterDescriptor =
(YarnClusterDescriptor)
yarnClusterClientFactory.createClusterDescriptor(configuration);

动态参数也是解析成key、value,传给configuration,最后在YarnClusterClientFactory工厂类里面创建YarnClusterDescriptor对象。

所以,当前动态参数设置不生效是bug,还是哪里使用姿势不对。有大佬能解答一下吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/