
On Thu, Sep 17, 2020 at 12:46 PM kandy.wang <kandy1...@163.com> wrote:

> @Jingsong Li
> public TableSink createTableSink(TableSinkFactory.Context context) {
>    CatalogTable table = checkNotNull(context.getTable());
> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>    boolean isGeneric =
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>    if (!isGeneric) {
> return new HiveTableSink(
>             context.getConfiguration().get(
>                   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> context.isBounded(),
>             new JobConf(hiveConf),
> context.getObjectIdentifier(),
> table);
> } else {
> return TableFactoryUtil.findAndCreateTableSink(context);
> }
> }
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> If it is false, using flink native writer to write parquet and orc files;
> If it is true, using hadoop mapred record writer to write parquet and orc
> files
> 将此参数调整成false后,同样的资源配置下,tps达到30W
> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> 一些相关的参数 ?
> 在 2020-09-17 11:21:43,"Jingsong Li" <jingsongl...@gmail.com> 写道:
> >Sink并行度
> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >
> >HDFS性能
> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >
> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang <kandy1...@163.com> wrote:
> >
> >> 场景很简单,就是kafka2hive
> >> --5min入仓Hive
> >>
> >> INSERT INTO  hive.temp_.hive_5min
> >>
> >>
> >>  arg_service,
> >>
> >> time_local
> >>
> >> .....
> >>
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'yyyyMMdd'),
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >>
> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
> '='kafka_hive_test',
> >> 'scan.startup.mode'='earliest-offset') */;
> >>
> >>
> >>
> >> --kafka source表定义
> >>
> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >>
> >> arg_service string COMMENT 'arg_service',
> >>
> >> ....
> >>
> >> )WITH (
> >>
> >>   'connector' = 'kafka',
> >>
> >>   'topic' = '...',
> >>
> >>   'properties.bootstrap.servers' = '...',
> >>
> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >>
> >>   'scan.startup.mode' = 'group-offsets',
> >>
> >>   'format' = 'json',
> >>
> >>   'json.fail-on-missing-field' = 'false',
> >>
> >>   'json.ignore-parse-errors' = 'true'
> >>
> >> );
> >> --sink hive表定义
> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> ....
> >> )
> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> 'hdfs://ssdcluster/....._5min' TBLPROPERTIES(
> >>   'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval' ='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >> );
> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> 就是flink sql可以
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> 这块,有没有什么可以提升性能相关的优化参数?
> >>
> >>
> >>
> >>
> >> 在 2020-09-16 19:29:50,"Jingsong Li" <jingsongl...@gmail.com> 写道:
> >> >Hi,
> >> >
> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >
> >> >另外,压测时是否可以看下jstack?
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang <kandy1...@163.com> wrote:
> >> >
> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> ,source
> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee

Best, Jingsong Lee
