????flink jdbc connector ????greenplum??

2020-09-16 文章 xuzh
flink jdbc connector greenplum

Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
可以再尝试下最新的1.11.2吗?

https://flink.apache.org/downloads.html

On Thu, Sep 17, 2020 at 1:33 PM kandy.wang  wrote:

> 是master分支代码
> 那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况
> 出现的,现在改成false 就走到else 部分 就暂时没这个问题了
> if (userMrWriter) {
>builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner,
> rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
> } else {
>Optional> bulkFactory =
> createBulkWriterFactory(partitionColumns, sd);
>if (bulkFactory.isPresent()) {
>   builder = StreamingFileSink.forBulkFormat(
> new org.apache.flink.core.fs.Path(sd.getLocation()),
> new
> FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
> .withBucketAssigner(assigner)
> .withRollingPolicy(rollingPolicy)
> .withOutputFileConfig(outputFileConfig);
> LOG.info("Hive streaming sink: Use native parquet writer.");
> } else {
>   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd,
> assigner, rollingPolicy, outputFileConfig);
> LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because
> BulkWriter Factory not available.");
> }
> }
> 在 2020-09-17 13:21:40,"Jingsong Li"  写道:
> >是最新的代码吗?
> >1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
> >它是影响性能的,1.11.2已经投票通过,即将发布
> >
> >On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
> >
> >> @Jingsong Li
> >>
> >> public TableSink createTableSink(TableSinkFactory.Context context) {
> >>CatalogTable table = checkNotNull(context.getTable());
> >> Preconditions.checkArgument(table instanceof CatalogTableImpl);
> >>
> >>boolean isGeneric =
> >>
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
> >>
> >>if (!isGeneric) {
> >> return new HiveTableSink(
> >> context.getConfiguration().get(
> >>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> >> context.isBounded(),
> >> new JobConf(hiveConf),
> >> context.getObjectIdentifier(),
> >> table);
> >> } else {
> >> return TableFactoryUtil.findAndCreateTableSink(context);
> >> }
> >> }
> >>
> >>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> >> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
> >>
> >> If it is false, using flink native writer to write parquet and orc
> files;
> >>
> >> If it is true, using hadoop mapred record writer to write parquet and
> orc
> >> files
> >>
> >> 将此参数调整成false后,同样的资源配置下,tps达到30W
> >>
> >> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> >> 一些相关的参数 ?
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >> >Sink并行度
> >> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >> >
> >> >HDFS性能
> >> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >> >
> >> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >> >
> >> >> 场景很简单,就是kafka2hive
> >> >> --5min入仓Hive
> >> >>
> >> >> INSERT INTO  hive.temp_.hive_5min
> >> >>
> >> >> SELECT
> >> >>
> >> >>  arg_service,
> >> >>
> >> >> time_local
> >> >>
> >> >> .
> >> >>
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >> >>
> >> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('
> properties.group.id
> >> '='kafka_hive_test',
> >> >> 'scan.startup.mode'='earliest-offset') */;
> >> >>
> >> >>
> >> >>
> >> >> --kafka source表定义
> >> >>
> >> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >> >>
> >> >> arg_service string COMMENT 'arg_service',
> >> >>
> >> >> 
> >> >>
> >> >> )WITH (
> >> >>
> >> >>   'connector' = 'kafka',
> >> >>
> >> >>   'topic' = '...',
> >> >>
> >> >>   'properties.bootstrap.servers' = '...',
> >> >>
> >> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >> >>
> >> >>   'scan.startup.mode' = 'group-offsets',
> >> >>
> >> >>   'format' = 'json',
> >> >>
> >> >>   'json.fail-on-missing-field' = 'false',
> >> >>
> >> >>   'json.ignore-parse-errors' = 'true'
> >> >>
> >> >> );
> >> >> --sink hive表定义
> >> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> >> 
> >> >> )
> >> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
> >> >>   'sink.partition-commit.trigger'='process-time',
> >> >>   'sink.partition-commit.delay'='0 min',
> >> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >> >>
>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >>   'sink.rolling-policy.check-interval' ='30s',
> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >> );
> >> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> >> 就是flink sql可以
> >> >>
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> >> 这块,有没有什么可以提升性能相关的优化参数?
> >> >>
> >> >>
> >> >>
> 

Re:Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 kandy.wang
是master分支代码
那你说的这个情况,刚好是table.exec.hive.fallback-mapred-writer默认是true 的情况 出现的,现在改成false 
就走到else 部分 就暂时没这个问题了
if (userMrWriter) {
   builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, 
rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
   Optional> bulkFactory = 
createBulkWriterFactory(partitionColumns, sd);
   if (bulkFactory.isPresent()) {
  builder = StreamingFileSink.forBulkFormat(
new org.apache.flink.core.fs.Path(sd.getLocation()),
new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), 
partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet writer.");
} else {
  builder = bucketsBuilderForMRWriter(recordWriterFactory, sd, assigner, 
rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer because 
BulkWriter Factory not available.");
}
}
在 2020-09-17 13:21:40,"Jingsong Li"  写道:
>是最新的代码吗?
>1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
>它是影响性能的,1.11.2已经投票通过,即将发布
>
>On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>CatalogTable table = checkNotNull(context.getTable());
>> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>>
>>boolean isGeneric =
>> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>>
>>if (!isGeneric) {
>> return new HiveTableSink(
>> context.getConfiguration().get(
>>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
>> context.isBounded(),
>> new JobConf(hiveConf),
>> context.getObjectIdentifier(),
>> table);
>> } else {
>> return TableFactoryUtil.findAndCreateTableSink(context);
>> }
>> }
>>
>> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
>> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>>
>> If it is false, using flink native writer to write parquet and orc files;
>>
>> If it is true, using hadoop mapred record writer to write parquet and orc
>> files
>>
>> 将此参数调整成false后,同样的资源配置下,tps达到30W
>>
>> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
>> 一些相关的参数 ?
>>
>>
>>
>>
>>
>> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>> >Sink并行度
>> >我理解是配置Sink并行度,这个一直在讨论,还没结论
>> >
>> >HDFS性能
>> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>> >
>> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>> >
>> >> 场景很简单,就是kafka2hive
>> >> --5min入仓Hive
>> >>
>> >> INSERT INTO  hive.temp_.hive_5min
>> >>
>> >> SELECT
>> >>
>> >>  arg_service,
>> >>
>> >> time_local
>> >>
>> >> .
>> >>
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>> >>
>> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
>> '='kafka_hive_test',
>> >> 'scan.startup.mode'='earliest-offset') */;
>> >>
>> >>
>> >>
>> >> --kafka source表定义
>> >>
>> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>> >>
>> >> arg_service string COMMENT 'arg_service',
>> >>
>> >> 
>> >>
>> >> )WITH (
>> >>
>> >>   'connector' = 'kafka',
>> >>
>> >>   'topic' = '...',
>> >>
>> >>   'properties.bootstrap.servers' = '...',
>> >>
>> >>   'properties.group.id' = 'flink_etl_kafka_hive',
>> >>
>> >>   'scan.startup.mode' = 'group-offsets',
>> >>
>> >>   'format' = 'json',
>> >>
>> >>   'json.fail-on-missing-field' = 'false',
>> >>
>> >>   'json.ignore-parse-errors' = 'true'
>> >>
>> >> );
>> >> --sink hive表定义
>> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> >> 
>> >> )
>> >> PARTITIONED BY (dt string , hm string) stored as orc location
>> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>> >>   'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval' ='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >> );
>> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> >> 就是flink sql可以
>> >>
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> >> 这块,有没有什么可以提升性能相关的优化参数?
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
>> >> >Hi,
>> >> >
>> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >> >
>> >> >另外,压测时是否可以看下jstack?
>> >> >
>> >> >Best,
>> >> >Jingsong
>> >> >
>> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
>> >> >
>> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> >> ,source
>> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> >> 

Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 文章 Jingsong Li
Thanks ZhuZhu for driving the release.

Best,
Jingsong

On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


-- 
Best, Jingsong Lee


[ANNOUNCE] Apache Flink 1.11.2 released

2020-09-16 文章 Zhu Zhu
The Apache Flink community is very happy to announce the release of Apache
Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2020/09/17/release-1.11.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348575

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Thanks,
Zhu


Re: Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
是最新的代码吗?
1.11.2解了一个bug:https://issues.apache.org/jira/browse/FLINK-19121
它是影响性能的,1.11.2已经投票通过,即将发布

On Thu, Sep 17, 2020 at 12:46 PM kandy.wang  wrote:

> @Jingsong Li
>
> public TableSink createTableSink(TableSinkFactory.Context context) {
>CatalogTable table = checkNotNull(context.getTable());
> Preconditions.checkArgument(table instanceof CatalogTableImpl);
>
>boolean isGeneric =
> Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));
>
>if (!isGeneric) {
> return new HiveTableSink(
> context.getConfiguration().get(
>   HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
> context.isBounded(),
> new JobConf(hiveConf),
> context.getObjectIdentifier(),
> table);
> } else {
> return TableFactoryUtil.findAndCreateTableSink(context);
> }
> }
>
> HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
> 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。
>
> If it is false, using flink native writer to write parquet and orc files;
>
> If it is true, using hadoop mapred record writer to write parquet and orc
> files
>
> 将此参数调整成false后,同样的资源配置下,tps达到30W
>
> 这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush
> 一些相关的参数 ?
>
>
>
>
>
> 在 2020-09-17 11:21:43,"Jingsong Li"  写道:
> >Sink并行度
> >我理解是配置Sink并行度,这个一直在讨论,还没结论
> >
> >HDFS性能
> >具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
> >
> >On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
> >
> >> 场景很简单,就是kafka2hive
> >> --5min入仓Hive
> >>
> >> INSERT INTO  hive.temp_.hive_5min
> >>
> >> SELECT
> >>
> >>  arg_service,
> >>
> >> time_local
> >>
> >> .
> >>
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> >> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
> >>
> >> FROM hive.temp_.kafka_source_pageview/*+ OPTIONS('properties.group.id
> '='kafka_hive_test',
> >> 'scan.startup.mode'='earliest-offset') */;
> >>
> >>
> >>
> >> --kafka source表定义
> >>
> >> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
> >>
> >> arg_service string COMMENT 'arg_service',
> >>
> >> 
> >>
> >> )WITH (
> >>
> >>   'connector' = 'kafka',
> >>
> >>   'topic' = '...',
> >>
> >>   'properties.bootstrap.servers' = '...',
> >>
> >>   'properties.group.id' = 'flink_etl_kafka_hive',
> >>
> >>   'scan.startup.mode' = 'group-offsets',
> >>
> >>   'format' = 'json',
> >>
> >>   'json.fail-on-missing-field' = 'false',
> >>
> >>   'json.ignore-parse-errors' = 'true'
> >>
> >> );
> >> --sink hive表定义
> >> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> >> 
> >> )
> >> PARTITIONED BY (dt string , hm string) stored as orc location
> >> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
> >>   'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval' ='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >> );
> >> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> >> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> >> 就是flink sql可以
> >>
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> >> 这块,有没有什么可以提升性能相关的优化参数?
> >>
> >>
> >>
> >>
> >> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >> >Hi,
> >> >
> >> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >> >
> >> >另外,压测时是否可以看下jstack?
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >> >
> >> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> >> ,source
> >> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >> >
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


回复: checkpoint问题

2020-09-16 文章 明启 孙
感谢解答

smq

发件人: Yun Tang
发送时间: 2020年9月17日 10:30
收件人: user-zh
主题: Re: checkpoint问题

Hi

checkpoint使用operator id进行一一映射进行恢复,请参照 
设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: smq <374060...@qq.com>
Sent: Thursday, September 17, 2020 7:02
To: user-zh 
Subject: checkpoint问题

如果我的程序逻辑修改,还能用之前的checkpoint吗



Re:Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 kandy.wang
@Jingsong Li

public TableSink createTableSink(TableSinkFactory.Context context) {
   CatalogTable table = checkNotNull(context.getTable());
Preconditions.checkArgument(table instanceof CatalogTableImpl);

   boolean isGeneric = 
Boolean.parseBoolean(table.getProperties().get(CatalogConfig.IS_GENERIC));

   if (!isGeneric) {
return new HiveTableSink(
context.getConfiguration().get(
  HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER),
context.isBounded(),
new JobConf(hiveConf),
context.getObjectIdentifier(),
table);
} else {
return TableFactoryUtil.findAndCreateTableSink(context);
}
}

HiveTableFactory中,有个配置table.exec.hive.fallback-mapred-writer默认是true,控制是否使用Hadoop
 自带的mr writer还是用flink native 实现的 writer去写orc parquet格式。

If it is false, using flink native writer to write parquet and orc files;

If it is true, using hadoop mapred record writer to write parquet and orc files

将此参数调整成false后,同样的资源配置下,tps达到30W

这个不同的ORC实现,可能性能本身就存在差异吧? 另外我们的存储格式是orc,orc有没有一些可以优化的参数,async  flush 一些相关的参数 ?





在 2020-09-17 11:21:43,"Jingsong Li"  写道:
>Sink并行度
>我理解是配置Sink并行度,这个一直在讨论,还没结论
>
>HDFS性能
>具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO
>
>On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:
>
>> 场景很简单,就是kafka2hive
>> --5min入仓Hive
>>
>> INSERT INTO  hive.temp_.hive_5min
>>
>> SELECT
>>
>>  arg_service,
>>
>> time_local
>>
>> .
>>
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
>> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>>
>> FROM hive.temp_.kafka_source_pageview/*+ 
>> OPTIONS('properties.group.id'='kafka_hive_test',
>> 'scan.startup.mode'='earliest-offset') */;
>>
>>
>>
>> --kafka source表定义
>>
>> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>>
>> arg_service string COMMENT 'arg_service',
>>
>> 
>>
>> )WITH (
>>
>>   'connector' = 'kafka',
>>
>>   'topic' = '...',
>>
>>   'properties.bootstrap.servers' = '...',
>>
>>   'properties.group.id' = 'flink_etl_kafka_hive',
>>
>>   'scan.startup.mode' = 'group-offsets',
>>
>>   'format' = 'json',
>>
>>   'json.fail-on-missing-field' = 'false',
>>
>>   'json.ignore-parse-errors' = 'true'
>>
>> );
>> --sink hive表定义
>> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
>> 
>> )
>> PARTITIONED BY (dt string , hm string) stored as orc location
>> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>>   'sink.partition-commit.trigger'='process-time',
>>   'sink.partition-commit.delay'='0 min',
>>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>>   'sink.rolling-policy.check-interval' ='30s',
>>   'sink.rolling-policy.rollover-interval'='10min',
>>   'sink.rolling-policy.file-size'='128MB'
>> );
>> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
>> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
>> 就是flink sql可以
>> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
>> 这块,有没有什么可以提升性能相关的优化参数?
>>
>>
>>
>>
>> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
>> >Hi,
>> >
>> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>> >
>> >另外,压测时是否可以看下jstack?
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
>> >
>> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
>> ,source
>> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>> >
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


Re: Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
Sink并行度
我理解是配置Sink并行度,这个一直在讨论,还没结论

HDFS性能
具体可以看HDFS到底什么瓶颈,是网络还是请求数还是连接数还是磁盘IO

On Wed, Sep 16, 2020 at 8:16 PM kandy.wang  wrote:

> 场景很简单,就是kafka2hive
> --5min入仓Hive
>
> INSERT INTO  hive.temp_.hive_5min
>
> SELECT
>
>  arg_service,
>
> time_local
>
> .
>
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'),
> FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区
>
> FROM hive.temp_.kafka_source_pageview/*+ 
> OPTIONS('properties.group.id'='kafka_hive_test',
> 'scan.startup.mode'='earliest-offset') */;
>
>
>
> --kafka source表定义
>
> CREATE TABLE hive.temp_vipflink.kafka_source_pageview (
>
> arg_service string COMMENT 'arg_service',
>
> 
>
> )WITH (
>
>   'connector' = 'kafka',
>
>   'topic' = '...',
>
>   'properties.bootstrap.servers' = '...',
>
>   'properties.group.id' = 'flink_etl_kafka_hive',
>
>   'scan.startup.mode' = 'group-offsets',
>
>   'format' = 'json',
>
>   'json.fail-on-missing-field' = 'false',
>
>   'json.ignore-parse-errors' = 'true'
>
> );
> --sink hive表定义
> CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (
> 
> )
> PARTITIONED BY (dt string , hm string) stored as orc location
> 'hdfs://ssdcluster/._5min' TBLPROPERTIES(
>   'sink.partition-commit.trigger'='process-time',
>   'sink.partition-commit.delay'='0 min',
>   'sink.partition-commit.policy.class'='...CustomCommitPolicy',
>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>   'sink.rolling-policy.check-interval' ='30s',
>   'sink.rolling-policy.rollover-interval'='10min',
>   'sink.rolling-policy.file-size'='128MB'
> );
> 初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40
> ,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
> 就是flink sql可以
> 改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
> 这块,有没有什么可以提升性能相关的优化参数?
>
>
>
>
> 在 2020-09-16 19:29:50,"Jingsong Li"  写道:
> >Hi,
> >
> >可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
> >
> >另外,压测时是否可以看下jstack?
> >
> >Best,
> >Jingsong
> >
> >On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
> >
> >> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40
> ,source
> >> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> >> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
> >
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-16 文章 Jingsong Li
你指的可能是控制sink的并行度,这个一直在讨论中

On Wed, Sep 16, 2020 at 10:26 PM wangenbao <156827...@qq.com> wrote:

> 感谢回复
> 目前确实使用keyBy,能把并行度提高,分散数据到多个TaskManager中,但遇见个问题
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916221935.png>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t959/QQ%E6%88%AA%E5%9B%BE20200916222005.png>
>
>
> 不知道能不能直接控制Insert语句的并行度
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best, Jingsong Lee


Re: 关于官方的k8s operator

2020-09-16 文章 Yang Wang
Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个
都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成

如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4]


[1]. https://github.com/lyft/flinkk8soperator
[2]. https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[3].
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
[4]. https://github.com/wangyang0918/flink-native-k8s-operator


Best,
Yang

Harold.Miao  于2020年9月17日周四 上午10:14写道:

> hi flink
>
> 请教一下官方在支持k8s operator部署这块有什么计划吗?
>
> 谢谢
>
>
> --
>
> Best Regards,
> Harold Miao
>


Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-16 文章 Kevin Dai
场景描述:
通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
问题描述:
Flink SQL 解析器会为每个聚合运算创建相同的两个数据源

在下面Blink Plan的配置说明中,引擎应该会优化复用相同的数据源
- table.optimizer.reuse-source-enabled
- table.optimizer.reuse-sub-plan-enabled

请问下,有人碰到类似问题么?








--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: K8s native 部署失败

2020-09-16 文章 Yang Wang
你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢

如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常

另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题


Best,
Yang

yanzhibo  于2020年9月16日周三 下午5:25写道:

> 一个job manager pod 提交job后,申请taskmanager失败
>
>
> Taskmanager 的异常
>
> Fatal error occurred in TaskExecutor akka.tcp://
> flink@179.10.251.70:6122/user/rpc/taskmanager_0.
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> 2020-09-16 09:14:39,077 ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal
> error occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~
>
>
> Jobmanger 异常
>
> 0d5f8478a2ab4e17d816810752f669eb) switched from SCHEDULED to FAILED on not
> deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> 

Re: flink table api中无法设置子json中的列为rowtime

2020-09-16 文章 Jark Wu
1. 只支持指定顶层字段作为 rowtime,如果要使用 nested field 作为 rowtime,可以先使用计算列(仅在 DDL
上支持)生成顶层列。
2. Descriptor API 有很多问题,且缺失很多功能,不建议使用,建议使用 DDL。 Descriptor API 将在1.12 版本中重构。


Best,
Jark


On Thu, 17 Sep 2020 at 10:41, kylin  wrote:

> flink版本1.7.2
>
> flink table api从kafka读取json数据,JsonSchema如下图所示
> 发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
> tableEnv.connect(
>   new Kafka()
> .version("0.10")
> .topic(topic_in)
> .property("bootstrap.servers", brokers)
> .property("group.id", "TableApiT2")
> .startFromLatest()
> ).withFormat(
>   new Json()
> .jsonSchema(
>   """
> |{
> |  "type": 'object',
> |  "properties": {
> |   "metric": {
> |"type": 'object',
> | "properties": {
> |   "time_stamp": {
> |   "type": 'string',
> |   format: 'date-time'
> |  },
> |  "event_time": {
> |   "type": 'string'
> |  },
> |  "cluster": {
> |   "type": 'string'
> |  },
> |  "host": {
> |   "type": 'string'
> |  },
> |  "instance": {
> |   "type": 'string'
> |  },
> |  "index_name": {
> |   "type": 'string'
> |  },
> |  "index_num": {
> |   "type": 'string'
> |  },
> |  "value": {
> |   "type": 'number'
> |  }
> |}
> |   },
> |   "source": {
> |"type": 'string'
> |   }
> |  }
> |}
> |""".stripMargin
> )
> ).withSchema(
>   new Schema()
> .field("metric",
>   Types.ROW_NAMED(
> Array("time_stamp", "event_time", "cluster", "host", "instance",
> "index_name", "index_num", "value"),
> Types.SQL_TIMESTAMP,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.STRING,
> Types.BIG_DEC
>   )
> ) //***如何指定上面row类型中time_stamp为rowtime
> ).inAppendMode()
>   .registerTableSource("metricTable")
>
>
>


flink table api中无法设置子json中的列为rowtime

2020-09-16 文章 kylin
flink版本1.7.2

flink table api从kafka读取json数据,JsonSchema如下图所示
发现rowtime无法从子json中的字段指定,麻烦帮忙确认下rowtime是否只能从顶层的字段来指定?
tableEnv.connect(
  new Kafka()
.version("0.10")
.topic(topic_in)
.property("bootstrap.servers", brokers)
.property("group.id", "TableApiT2")
.startFromLatest()
).withFormat(
  new Json()
.jsonSchema(
  """
|{
|  "type": 'object',
|  "properties": {
|   "metric": {
|"type": 'object',
| "properties": {
|   "time_stamp": {
|   "type": 'string',
|   format: 'date-time'
|  },
|  "event_time": {
|   "type": 'string'
|  },
|  "cluster": {
|   "type": 'string'
|  },
|  "host": {
|   "type": 'string'
|  },
|  "instance": {
|   "type": 'string'
|  },
|  "index_name": {
|   "type": 'string'
|  },
|  "index_num": {
|   "type": 'string'
|  },
|  "value": {
|   "type": 'number'
|  }
|}
|   },
|   "source": {
|"type": 'string'
|   }
|  }
|}
|""".stripMargin
)
).withSchema(
  new Schema()
.field("metric",
  Types.ROW_NAMED(
Array("time_stamp", "event_time", "cluster", "host", "instance", 
"index_name", "index_num", "value"),
Types.SQL_TIMESTAMP,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.BIG_DEC
  )
) //***如何指定上面row类型中time_stamp为rowtime
).inAppendMode()
  .registerTableSource("metricTable")




Re: checkpoint问题

2020-09-16 文章 Yun Tang
Hi

checkpoint使用operator id进行一一映射进行恢复,请参照 
设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: smq <374060...@qq.com>
Sent: Thursday, September 17, 2020 7:02
To: user-zh 
Subject: checkpoint问题

如果我的程序逻辑修改,还能用之前的checkpoint吗


Re: 请教大家如何注册支持多返回值类型的UDAF

2020-09-16 文章 Jark Wu
这块可以看下 custom type inference 的文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#custom-type-inference

On Wed, 16 Sep 2020 at 20:46, silence  wrote:

> 如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问:
>
> 
> 1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现?
>
> 
> ​2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型
> 
> ​3、看了源码中的aggFuction的注册过程,发现也是对不同的数据类型进行了多次实现,然后在使用时根据参数的类型进行不同的实现类的创建,最后的疑问就是现有基于现有的flink
> api如果实现类似的效果呢?
> 感谢大佬们的解答
>
>


Flink 1.11 datastream写hive parquet表异常

2020-09-16 文章 zilong xiao
[image: image.png]


关于官方的k8s operator

2020-09-16 文章 Harold.Miao
hi flink

请教一下官方在支持k8s operator部署这块有什么计划吗?

谢谢


-- 

Best Regards,
Harold Miao


checkpoint问题

2020-09-16 文章 smq
如果我的程序逻辑修改,还能用之前的checkpoint吗

Re: FlinkSQL1.11.1读取kafka写入Hive(parquet) OOM问题

2020-09-16 文章 wangenbao
感谢回复
目前确实使用keyBy,能把并行度提高,分散数据到多个TaskManager中,但遇见个问题

 

 

不知道能不能直接控制Insert语句的并行度



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教大家如何注册支持多返回值类型的UDAF

2020-09-16 文章 silence
如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问:

1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现?

​2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型

​3、看了源码中的aggFuction的注册过程,发现也是对不同的数据类型进行了多次实现,然后在使用时根据参数的类型进行不同的实现类的创建,最后的疑问就是现有基于现有的flink
 api如果实现类似的效果呢?
感谢大佬们的解答



Re:Re: StreamingFileWriter 压测性能

2020-09-16 文章 kandy.wang
场景很简单,就是kafka2hive 
--5min入仓Hive

INSERT INTO  hive.temp_.hive_5min

SELECT

 arg_service,

time_local

.

FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd'), 
FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')  5min产生一个分区

FROM hive.temp_.kafka_source_pageview/*+ 
OPTIONS('properties.group.id'='kafka_hive_test', 
'scan.startup.mode'='earliest-offset') */;



--kafka source表定义

CREATE TABLE hive.temp_vipflink.kafka_source_pageview (

arg_service string COMMENT 'arg_service',



)WITH (

  'connector' = 'kafka',

  'topic' = '...',

  'properties.bootstrap.servers' = '...',

  'properties.group.id' = 'flink_etl_kafka_hive',

  'scan.startup.mode' = 'group-offsets',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);
--sink hive表定义
CREATE TABLE temp_vipflink.vipflink_dm_log_app_pageview_5min (

)
PARTITIONED BY (dt string , hm string) stored as orc location 
'hdfs://ssdcluster/._5min' TBLPROPERTIES(
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.delay'='0 min',
  'sink.partition-commit.policy.class'='...CustomCommitPolicy',
  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
  'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
);
初步看下来,感觉瓶颈在写hdfs,hdfs 这边已经是ssd hdfs了,kafka的分区数=40 
,算子并行度=40,tps也就达到6-7万这样子,并行度放大,性能并无提升。
就是flink sql可以 
改局部某个算子的并行度,想单独改一下StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter 
这块,有没有什么可以提升性能相关的优化参数?




在 2020-09-16 19:29:50,"Jingsong Li"  写道:
>Hi,
>
>可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>
>另外,压测时是否可以看下jstack?
>
>Best,
>Jingsong
>
>On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:
>
>> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
>> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
>> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
>
>
>
>-- 
>Best, Jingsong Lee


Re: StreamingFileWriter 压测性能

2020-09-16 文章 Jingsong Li
Hi,

可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?

另外,压测时是否可以看下jstack?

Best,
Jingsong

On Wed, Sep 16, 2020 at 2:03 PM kandy.wang  wrote:

> 压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
> writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
> 想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少



-- 
Best, Jingsong Lee


Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-16 文章 hao kong
十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。

Congxian Qiu  于2020年9月16日周三 下午1:55写道:

> Hi
> 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢?
> 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source
> 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink
> 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制
> 另外你也可以看下这个文档[2] 看是否在你的场景中有帮助
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> Best,
> Congxian
>
>
> hao kong  于2020年9月16日周三 上午10:24写道:
>
> > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
> >
> >
> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。
> >
>


K8s native 部署失败

2020-09-16 文章 yanzhibo
一个job manager pod 提交job后,申请taskmanager失败 


Taskmanager 的异常

Fatal error occurred in TaskExecutor 
akka.tcp://flink@179.10.251.70:6122/user/rpc/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.11.1.jar:1.11.1]
2020-09-16 09:14:39,077 ERROR 
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] - Fatal error 
occurred while executing the TaskManager. Shutting it down...
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1251)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$18(TaskExecutor.java:1237)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
 ~


Jobmanger 异常

0d5f8478a2ab4e17d816810752f669eb) switched from SCHEDULED to FAILED on not 
deployed.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources.
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
 ~[flink-dist_2.12-1.11.1.jar:1.11.1]
at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) 
~[?:1.8.0_265]
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
 ~[?:1.8.0_265]
at 

Re: Re:Flink SQL format问题

2020-09-16 文章 Jark Wu
只支持单字符。 支持不可见字符。

On Wed, 16 Sep 2020 at 15:24, guaishushu1...@163.com 
wrote:

> 好的 谢谢啦,csv.field-delimiter 字段分隔目前是只支持 ,和; 这种简单字符吗
>
> --
> guaishushu1...@163.com
>
>
> *发件人:* Jark Wu 
> *发送时间:* 2020-09-16 15:21
> *收件人:* user-zh ; Shuai Xia
> 
> *抄送:* guaishushu1...@163.com
> *主题:* Re: 回复:Flink SQL format问题
> 文档 [1] 中写的有问题,应该是 U&'\000A'   只有一个反斜线。
>
> Best,
> Jark
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/csv.html#csv-line-delimiter
>
> On Wed, 16 Sep 2020 at 12:53, Shuai Xia 
> wrote:
>
>> U&'\\000A',代码也用了U&?
>>
>>
>> --
>> 发件人:guaishushu1...@163.com 
>> 发送时间:2020年9月16日(星期三) 10:50
>> 收件人:Shuai Xia 
>> 主 题:Re: 回复:Flink SQL format问题
>>
>>
>>
>> 表定义:
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic' = '',
>> 'connector.properties.bootstrap.servers' = '',
>> 'connector.properties.zookeeper.connect' = '',
>> 'connector.properties.group.id' = '',
>> 'connector.properties.client.id' = '',
>> 'connector.startup-mode' = 'latest-offset',
>> 'format.type' = 'csv',
>> 'csv.field-delimiter' = '\001',
>> 'csv.line-delimiter' = U&'\\000A'
>> 错误:
>> aused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable table factory for
>> 'org.apache.flink.table.factories.TableSinkFactory' in
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---the classpath.
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> bugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> (com.dataplatform.flink.util.Flink
>> DebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> (com.dataplatform.flink.util.Flink
>> DebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>> [2020-09-16 10:46:52,533] INFO ---  at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> (com.dataplatform.flink.util.FlinkDebugThread)
>>
>>
>>
>>
>> guaishushu1...@163.com
>> 发件人: Shuai Xia
>> 发送时间: 2020-09-16 10:33
>> 收件人: user-zh; guaishushu1103
>> 主题: 回复:Flink SQL format问题
>> Hi,麻烦代码以及报错贴一下
>>
>> --
>> 发件人:guaishushu1...@163.com 
>> 发送时间:2020年9月16日(星期三) 10:29
>> 收件人:user-zh 
>> 主 题:Flink SQL format问题
>>
>> csv.field-delimiter
>> csv.line-delimiter
>> 想问下大佬们 官方文档说是可以设置这两个值,指定行分隔和字段分隔,但是在设置kafka sinkTable的时候会出现语法错误???很奇怪
>>
>>
>>
>> guaishushu1...@163.com
>>
>


两个over窗口对应的view join问题

2020-09-16 文章 chen310
有两个over窗口对应的view,计算逻辑都是当前的事件时间 hitDateTime 向前推一定时间作为窗口,计算相同的ruleName
记录数量,一个向前推15分钟,一个30分钟。

hitAt字段是一个long类型的时间戳,hitDateTime 是根据hitAt转换为DateTime类型的时间。

create view v1 as 
select
ruleName ,
hitAt ,
hitDateTime,

count(*) over w1 as count15minByRuleName,
0 as count30minByRuleName

from common_rule_param_result_topic_middle
window w1 as (partition by ruleName order by hitDateTime asc RANGE BETWEEN
INTERVAL '15' minute preceding AND CURRENT ROW);


create view v2 as 
select
ruleName ,
hitAt ,
hitDateTime,

0 count15minByRuleName,
count(*) over w1 as count30minByRuleName

from common_rule_param_result_topic_middle
window w1 as (partition by ruleName order by hitDateTime asc RANGE BETWEEN
INTERVAL '30' minute preceding AND CURRENT ROW);


现在想把两个view
join起来,以便能够在一个结果中就直接输出count15minByRuleName,count30minByRuleName结果。


select 
v1.ruleName,
v1.hitAt,
v1.count15minByRuleName,
v2.count30minByRuleName

from v1 join v2 
on v1.ruleName=v2.ruleName and v1.hitAt=v2.hitAt;


输出的结果是 count15minByRuleName和count30minByRuleName
两个值一直是一样的。即使count30minByRuleName 应该比count15minByRuleName大的情况下。

请教下是啥原因,不能这么join么。













--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Flink SQL format问题

2020-09-16 文章 Jark Wu
文档 [1] 中写的有问题,应该是 U&'\000A'   只有一个反斜线。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/csv.html#csv-line-delimiter

On Wed, 16 Sep 2020 at 12:53, Shuai Xia 
wrote:

> U&'\\000A',代码也用了U&?
>
>
> --
> 发件人:guaishushu1...@163.com 
> 发送时间:2020年9月16日(星期三) 10:50
> 收件人:Shuai Xia 
> 主 题:Re: 回复:Flink SQL format问题
>
>
>
> 表定义:
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = '',
> 'connector.properties.bootstrap.servers' = '',
> 'connector.properties.zookeeper.connect' = '',
> 'connector.properties.group.id' = '',
> 'connector.properties.client.id' = '',
> 'connector.startup-mode' = 'latest-offset',
> 'format.type' = 'csv',
> 'csv.field-delimiter' = '\001',
> 'csv.line-delimiter' = U&'\\000A'
> 错误:
> aused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSinkFactory' in
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---the classpath.
> (com.dataplatform.flink.util.FlinkDebugThread)
> bugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> (com.dataplatform.flink.util.Flink
> DebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> (com.dataplatform.flink.util.Flink
> DebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> (com.dataplatform.flink.util.FlinkDebugThread)
> [2020-09-16 10:46:52,533] INFO ---  at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> (com.dataplatform.flink.util.FlinkDebugThread)
>
>
>
>
> guaishushu1...@163.com
> 发件人: Shuai Xia
> 发送时间: 2020-09-16 10:33
> 收件人: user-zh; guaishushu1103
> 主题: 回复:Flink SQL format问题
> Hi,麻烦代码以及报错贴一下
>
> --
> 发件人:guaishushu1...@163.com 
> 发送时间:2020年9月16日(星期三) 10:29
> 收件人:user-zh 
> 主 题:Flink SQL format问题
>
> csv.field-delimiter
> csv.line-delimiter
> 想问下大佬们 官方文档说是可以设置这两个值,指定行分隔和字段分隔,但是在设置kafka sinkTable的时候会出现语法错误???很奇怪
>
>
>
> guaishushu1...@163.com
>


Re:Streaming File Sink 不能生成 _SUCCESS 标记文件

2020-09-16 文章 kandy.wang
加上这个参数'sink.partition-commit.policy.kind'='metastore,success-file'

这个应该是可以work的


在 2020-09-16 15:01:35,"highfei2011"  写道:
>Hi,各位好!
>  目前遇到一个问题,在使用 FLink -1.11.0 消费 Kafka 数据后,使用 Streaming File Sink 的 
> BucketAssigner 的分桶策略 sink 到 hdfs ,默认没有生成 _SUCCESS 标记文件。
>  我在配置中新增了 
>val hadoopConf = new Configuration()
>hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true”)   
> 
>
>
>但是输出目录里,还是没有 _SUCCESS 文件,麻烦帮出出主意呢,再次谢谢各位!
>
>
>Best,
>Yang


Streaming File Sink 不能生成 _SUCCESS 标记文件

2020-09-16 文章 highfei2011
Hi,各位好!
  目前遇到一个问题,在使用 FLink -1.11.0 消费 Kafka 数据后,使用 Streaming File Sink 的 
BucketAssigner 的分桶策略 sink 到 hdfs ,默认没有生成 _SUCCESS 标记文件。
  我在配置中新增了 
val hadoopConf = new Configuration()
hadoopConf.set(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true”)


但是输出目录里,还是没有 _SUCCESS 文件,麻烦帮出出主意呢,再次谢谢各位!


Best,
Yang

Flink SQL create view问题

2020-09-16 文章 guaishushu1...@163.com
当create_view和LATERAL TABLE 共用时 会出现字段找不到异常

语法:
CREATE TABLE billing_data_test (
message  STRING


create view v1 as
select T.*
from billing_data_test,
LATERAL TABLE(SplitUdtf(message)) as T(scate1,  scate2,  belong_local1,  ssrc2, 
 gift,  coupon,  local_type);

异常:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 
'message' not found in any table (com.dataplatform.flink.util.FlinkDebugThread)
[2020-09-16 14:32:04,857] INFO ---  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
(com.dataplatform.flink.util.FlinkDebugThread)





guaishushu1...@163.com


StreamingFileWriter 压测性能

2020-09-16 文章 kandy.wang
压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source 
writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少