Re:Flink操作 kafka ,hive遇到kerberos过期

2020-07-29 文章 felixzh
遇到kerberos过期问题,应该是你使用的是ticket cache,而不是keytab文件做认证

















在 2020-07-28 17:58:51,"lydata"  写道:
> 请问下 Flink操作 kafka ,hive遇到kerberos过期  有什么解决方法吗?


RE: flink消费kafka提交偏移量

2020-07-29 文章 venn
可以参考下这个:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/
kafka.html#kafka-consumers-offset-committing-behaviour-configuration

-Original Message-
From: user-zh-return-6007-wxchunjhyy=163@flink.apache.org
 On Behalf Of 小学
生
Sent: 2020年7月30日 10:57
To: user-zh 
Subject: flink消费kafka提交偏移量

各位大佬,我自己使用flink1.11时,消费kafka数据源时候,使用group offset,在外
部的kafka tool发现offset没有及时提交,请问下这个在flink中怎么保证呢


Re: flink 1.11 streaming 写入hive 5min表相关问题

2020-07-29 文章 Jingsong Li
Hi,

1.checkpoint会强制滚动
2.目前最简单的思路是加大checkpoint interval,另一个思路是在partition commit时触发hive去compaction。
3.success文件的生成依赖checkpoint interval,所以会有一定延迟。

Best,
Jingsong

On Thu, Jul 30, 2020 at 1:14 PM kandy.wang  wrote:

> 现象:
> CREATE TABLE test.xxx_5min (
>
> ..
>
> ) PARTITIONED BY (dt string , hm string) stored as orc  TBLPROPERTIES(
>
>   'sink.partition-commit.trigger'='process-time',
>
>   'sink.partition-commit.delay'='5 min',
>
>   'sink.partition-commit.policy.kind'='metastore,success-file',
>
>   'sink.rolling-policy.file-size'='128MB',
>
>   'sink.rolling-policy.check-interval' ='30s',
>
>   'sink.rolling-policy.rollover-interval'='5min'
> );
> dt = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd')
> hm = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')
> 5min产生一个分区, ,checkpoint频率:30s
> 问题:
> 1.flink 1.11 
> steaming写入为什么是1min产生一个文件,而且文件大小没有到128M,如果参数sink.rolling-policy.rollover-interval'='5min
> 文件滚动时间 5min 滚动大小128M生效的话,就不应该产生这么小的问题,文件大小没有按照预期控制,为啥?
>  2.小文件问题该如何解决?有什么好的思路
> 3. 标记文件_Success文件为啥上报延迟? 如果是
> 12:30的分区,5min的分区,理论上应该12:35左右的时候就应该提交partition?
>
>
>
>


-- 
Best, Jingsong Lee


flink 1.11 streaming 写入hive 5min表相关问题

2020-07-29 文章 kandy.wang
现象:
CREATE TABLE test.xxx_5min (

..

) PARTITIONED BY (dt string , hm string) stored as orc  TBLPROPERTIES(

  'sink.partition-commit.trigger'='process-time',

  'sink.partition-commit.delay'='5 min',

  'sink.partition-commit.policy.kind'='metastore,success-file',

  'sink.rolling-policy.file-size'='128MB',

  'sink.rolling-policy.check-interval' ='30s',

  'sink.rolling-policy.rollover-interval'='5min'

); 
dt = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'MMdd')
hm = FROM_UNIXTIME((UNIX_TIMESTAMP()/300 * 300) ,'HHmm')
5min产生一个分区, ,checkpoint频率:30s
问题:
1.flink 1.11 
steaming写入为什么是1min产生一个文件,而且文件大小没有到128M,如果参数sink.rolling-policy.rollover-interval'='5min
 文件滚动时间 5min 滚动大小128M生效的话,就不应该产生这么小的问题,文件大小没有按照预期控制,为啥?
 2.小文件问题该如何解决?有什么好的思路
3. 标记文件_Success文件为啥上报延迟? 如果是 12:30的分区,5min的分区,理论上应该12:35左右的时候就应该提交partition?

?????? flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
sorry,idea??log4j??process-time ?? 
process time??log??




--  --
??: 
   "user-zh"

http://connector.properties.group.id/>;' 
= 'domain_testGroup',\n" +
>   
 "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
>   
 "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
>   
 "\t'update-mode' = 'append',\n" +
>   
 "\t'format.type' = 'json',\n" +
>   
 "\t'format.derive-schema' = 'true'\n" +
>   
 ")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
 
;

Re: 关于window过程中barrier的问题

2020-07-29 文章 Congxian Qiu
Hi Shuwen
barrier 是和 checkpoint 相关的逻辑,用来触发 checkpoint 的,你可以认为 barrier
和数据的顺序必须是严格保证的,不然没法保证 exactlyonce 的语义。

   假设某个算子 B 有两个上游 A1 和 A2,那么 A1 和 A2 的 barrier 都发送的 B 之后,B 才会开始做
checkpoint,假设 A1 的 barrier 在 10:00 到了,A2 的 barrier 在 10:01 才到,那么 10:00 -
10:01 这段时间内,A1 发送到 B 的数据是否会被处理取决于是 exactlyonce,还是 at least once。如果是 exactly
once 语义,则不会处理(堆积在 B 这里),如果是 at least once 语义则会处理并且发送到下游。

   另外也可以阅读一下社区相关的文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/learn-flink/fault_tolerance.html#how-does-state-snapshotting-work
Best,
Congxian


shuwen zhou  于2020年7月29日周三 下午4:55写道:

> 大家好,想请教一个关于barrier的问题
> 如果我有如下算子
> .window()
> .reduce()
> 假设barrier和元素顺序是
> tuple 和 barrier
> 当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?
>
>
>
> Best Wishes,
> Shuwen Zhou
>


flink????kafka??????????

2020-07-29 文章 ??????
flink1.11kafkagroup 
offset??kafka tooloffset??flink

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);


这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
RocksDBStateBackend:
 
FsStateBackend:
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? Sql??kafka????????????????

2020-07-29 文章 op
?? 1.10??connector type


--  --
??: 
   "user-zh"



Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,目前是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 





Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>

Re: 近期使用flinksql(1.11.0)发现的一些问题

2020-07-29 文章 wxpcc
Q1, 可以使用 DROP TABLE IF EXISTS table;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Sql往kafka表写聚合数据报错

2020-07-29 文章 Jark Wu
抱歉哈,1.11 提供的 Canal 和 Debezium 还不支持 sink 。 预计会在 1.12 中提供。

On Wed, 29 Jul 2020 at 12:51, Benchao Li  wrote:

> 你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。
>
> op <520075...@qq.com> 于2020年7月29日周三 上午11:59写道:
>
> > 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
> >
> >
> > 谢谢
> >
> >
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> > Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
> > doesn't support consuming update changes which is produced by node
> > GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
> > text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
> > be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
> > share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS
> follow_count,
> > SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
> > AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS
> event_time])
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 近期使用flinksql(1.11.0)发现的一些问题

2020-07-29 文章 Jark Wu
Hi,

1. 这个功能应该是漏加了,我建了个 issue 去支持这个功能:
https://issues.apache.org/jira/browse/FLINK-18756
2. update-mode 属性对于 kafka 来说一直都是没有用的(因为只支持 append-only)。所以自 1.10
,这个属性就变成可选了,文档中也不再标识出来。
1.11 中新版的 connector 的实现中,也没有这个属性。
3. 目前 DDL 建表语句将表元信息存到 catalog 中,是不会走校验逻辑的。校验逻辑现在发生在 query 编译期。 这个确实和
fail-fast 原则有点相悖。

Best,
Jark

On Wed, 29 Jul 2020 at 17:31, Ryiyi <18668118...@163.com> wrote:

> 1. create table语句不支持create talbe if not exists:
> 不支持if not exists语法在实际使用时特别麻烦,每次重新执行SQL都需要先删除上次执行创建的table。
> Q1: CREATE TABLE IF NOT EXITS语法个人理解实现并不特别麻烦,社区为什么还没实现?
> 2. flink1.11创建kafka sink表时不再支持update-mode属性:
> 创建kafka
> sink表时报不支持udpate-mode属性的语法检验错误。但查看flink1.11源码中仍存在多个测试类在使用update-mode属性。
> Q2:从flink1.11开始,kafka数据源的建表语句是否明确不再支持update-mode?(官网示例中已删除,但源码测试类中仍存在)
> 3. update-mode属性校验失败,任务推出,但该表却已经在catalog中生成
> Q3:建表语句的属性校验失败,任务异常退出,但该表却已经在catalog中生成。感觉不太合理
>
>
> 麻烦了解的大佬抽空解答下,多谢!


Re: flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

2020-07-29 文章 admin
你指定时间语义是EventTime了吗
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


> 2020年7月29日 上午9:56,111  写道:
> 
> 
> 
> 
> 
> 
> 
> 您好,请教一个问题,谢谢:
> 很简单的json,
> {"num":100,"ts":1595949526874,"vin":""}
> {"num":200,"ts":1595949528874,"vin":""}
> {"num":200,"ts":1595949530880,"vin":""}
> {"num":300,"ts":1595949532883,"vin":""}
> {"num":100,"ts":1595949534888,"vin":""}
> {"num":300,"ts":1595949536892,"vin":""}
> 我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
> public class FlinkKafka {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
> " ts BIGINT,\n" +
> " num INT ,\n" +
> " vin STRING ,\n" +
> " pts AS PROCTIME() ,  \n" +  //处理时间
> " rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, '-MM-dd HH:mm:ss')), 
> \n " +
> "  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kkb',\n" +
> " 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
> " 'properties.group.id' = 'mm',\n" +
> " 'format' = 'json',\n" +
> " 'scan.startup.mode' = 'latest-offset' \n" +
> ")";
>tableEnv.executeSql(kafkaSourceTable);
> 
>String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
> final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);
> 
>windowAllTable.printSchema();
>tableEnv.toAppendStream(windowAllTable, Row.class).print();
>
> System.out.println("--");
>env.execute("job");
> 
>}
> 
> }
> 
> 
> ---
> 请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
> by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
> 如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
> 打印结果:
> root
> |-- ts: BIGINT
> |-- num: INT
> |-- vin: STRING
> |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
> |-- rowtime: TIMESTAMP(3) *ROWTIME*
> 
> 
> --
> 11> 1595949629063,500,,2020-07-28T15:20:29.066,2020-07-28T23:20:29
> 7> 1595949627062,500,,2020-07-28T15:20:27.101,2020-07-28T23:20:27
> 7> 1595949631067,100,,2020-07-28T15:20:31.071,2020-07-28T23:20:31
> 12> 1595949633072,500,,2020-07-28T15:20:33.077,2020-07-28T23:20:33
> 11> 1595949637081,400,,2020-07-28T15:20:37.085,2020-07-28T23:20:37
> 2> 1595949635077,400,,2020-07-28T15:20:35.082,2020-07-28T23:20:35
> 11> 1595949639085,100,,2020-07-28T15:20:39.089,2020-07-28T23:20:39
> 1> 1595949643093,200,,2020-07-28T15:20:43.096,2020-07-28T23:20:43
> 
> 
> 但是如果我使用TUMBLE(rowtime, INTERVAL '5' 
> SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
> 版本是flink1.11.0
> 
> 
> 望指教,谢谢!
> 
> 



Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.c

回复: flink1.11 sql 发布到yarn session时找不到hbase相关的类

2020-07-29 文章 wind.fly....@outlook.com
Hi,Xintong:
是把相关的jar配到hadoop_classpath然后再export吗?

Best,
Junbao Zhang

发件人: Xintong Song 
发送时间: 2020年7月30日 10:28
收件人: user-zh 
主题: Re: flink1.11 sql 发布到yarn session时找不到hbase相关的类

export HADOOP_CLASSPATH 就可以了


Thank you~

Xintong Song



On Wed, Jul 29, 2020 at 6:43 PM wind.fly@outlook.com <
wind.fly@outlook.com> wrote:

> Hi,all:
> 最近在升级flink1.11,sql中用到hbase connctor,发布到yarn-session时,报如下异常:
> 2020-07-29 11:49:55
> org.apache.hadoop.hbase.DoNotRetryIOException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.hbase.util.ByteStringer
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:248)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:221)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:388)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:362)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:142)
> at
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.hbase.util.ByteStringer
> at
> org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1053)
> at
> org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:496)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:402)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:274)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
> ... 7 more
>
> 注意到官方文档有Note:
> Note: To use HBase connector in SQL Client or Flink cluster, it’s highly
> recommended to add HBase dependency jars to Hadoop classpath. Flink will
> load all jars under Hadoop classpath automatically, please refer to HBase,
> MapReduce, and the CLASSPATH<
> https://hbase.apache.org/book.html#hbase.mapreduce.classpath> about how
> to add HBase dependency jars to Hadoop classpath.
>   但是在yarn session下怎么设置classpath呢
>


Re: flink1.11 sql 发布到yarn session时找不到hbase相关的类

2020-07-29 文章 Xintong Song
export HADOOP_CLASSPATH 就可以了


Thank you~

Xintong Song



On Wed, Jul 29, 2020 at 6:43 PM wind.fly@outlook.com <
wind.fly@outlook.com> wrote:

> Hi,all:
> 最近在升级flink1.11,sql中用到hbase connctor,发布到yarn-session时,报如下异常:
> 2020-07-29 11:49:55
> org.apache.hadoop.hbase.DoNotRetryIOException:
> java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.hbase.util.ByteStringer
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:248)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:221)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:388)
> at
> org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:362)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:142)
> at
> org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.hadoop.hbase.util.ByteStringer
> at
> org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1053)
> at
> org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:496)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:402)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:274)
> at
> org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
> at
> org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
> ... 7 more
>
> 注意到官方文档有Note:
> Note: To use HBase connector in SQL Client or Flink cluster, it’s highly
> recommended to add HBase dependency jars to Hadoop classpath. Flink will
> load all jars under Hadoop classpath automatically, please refer to HBase,
> MapReduce, and the CLASSPATH<
> https://hbase.apache.org/book.html#hbase.mapreduce.classpath> about how
> to add HBase dependency jars to Hadoop classpath.
>   但是在yarn session下怎么设置classpath呢
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.c

Flink Weekly | 每周社区动态更新 - 2020/07/30

2020-07-29 文章 王松
大家好,本文为 Flink Weekly 的第二十四期,由王松整理,李本超Review。

本期主要内容包括:近期社区开发进展、邮件问题答疑、Flink 最新社区动态及技术文章推荐等。


社区开发进展 Release

[releases] Flink 1.11.1 正式发布!

具体信息参考:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-11-1-released-td43335.html
Vote

[vote] 伍翀发起Refactor Descriptor API to register connectors in Table API的投票

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-td43420.html

[vote] Shuiqiang Chen发起支持 Python DataStream API (Stateless part) 的投票

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-130-Support-for-Python-DataStream-API-Stateless-Part-td43424.html
Discuss

[connector] 李本超发起了关于对齐 InputFormat#nextRecord 返回为空值语义的讨论。目前还没有明确的相关 java
doc,flink 中通常有三种处理方式:

   1.

   将 null 作为输入的结尾
   2.

   跳过 null
   3.

   假定 InputFormat#nextRecord 中的值不能为 null

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Align-the-semantic-of-returning-null-from-InputFormat-nextRecord-td43379.html

[releases] Robert Metzger发起了关于发布 Flink 1.12 计划的讨论,并决定在9月底之前冻结master的功能。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Planning-Flink-1-12-td43348.html#a43383

[connector] Israel Ekpo发起了关于在 DataStream、Table 和 SQL Connectors 中支持 Azure
平台的讨论,并列出了相关的issue,来跟踪这些 connectors

对 Azure 平台做出的贡献,目前在用户邮件列表中已经有大约50个 Azure 相关的主题,这也证明了 Flink 在Azure平台上的使用度

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-Azure-Platform-Support-in-DataStream-Table-and-SQL-Connectors-td43342.html

[connector] Seth Wiesman 发起了关于使用 LIKE 子句创建的 DataGen 表中的时间戳处理问题,目前 DataGen
表只支持 FLINK SQL 的部分字段类型,

比如 TIMESTAMP(3) 就不支持,文档中建议是使用计算列创建事件时间属性。在 DataGen 表中使用 LIKE 子句时,如果物理表是
kafka 表就会报错。

Seth Wiesman 给出了两种解决方式:

   1.

   在datagen表中支持TIMESTAMP
   2.

   放宽 LIKE 子句的约束,允许使用计算列覆盖物理列

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Handling-of-Timestamp-in-DataGen-table-created-via-LIKE-td43433.html

[release] Robert Metzger
发起了关于过时blockers和不稳定build的讨论,希望将此作为长期的讨论组,定期同步过时的blocker和不稳定的build,并列出了一些test

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Stale-blockers-and-build-instabilities-td43477.html
用户问题

[sql] 刘首维 提问如果基于Flink1.11新的API去开发的话,如何获取到DataStream?并列举了几个使用场景:

   1.

   我们现在有某种特定的Kafka数据格式,1条Kafka数据
   会对应转换n(n为正整数)条Row数据,我们的做法是在emitDataStream的时候增加了一个process/FlatMap阶段,
   2.

   用于处理这种情况,这样对用户是透明的。
   3.

   我们目前封装了一些自己的Sink,我们会在Sink之前增加一个process/Filter 用来做缓冲池/微批/数据过滤等功能
   4.

   调整或者指定Source/Sink并行度为用户指定值,我们也是在DataStream层面上去做的
   5.

   对于一些特殊Source Sink,他们会和KeyBy操作组合(对用户透明),我们也是在DataStream层面上去做的

云邪进行了回答,

   1.

   场景1建议做在 DeserializationSchema。
   2.

   场景2建议封装在 SinkFunction。
   3.

   场景3社区有计划在 FLIP-95 之上支持,会提供并发(或者分区)推断的能力。
   4.

   场景4可以通过引入类似 SupportsPartitioning 的接口实现。

并建了一个issue来跟进 [https://issues.apache.org/jira/browse/FLINK-18674]

http://apache-flink.147419.n8.nabble.com/1-11Flink-SQL-API-td5261.html#a5275

[sql] Dream-底限 提问如何在eval()方法中传递Row类型?

godfrey he、云邪和李本超进行了回答,可以参考[
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
]进行实现,

但是Dream-底限需要的是 Row[] 作为eval参数,目前并不支持,社区有 issue[
https://issues.apache.org/jira/browse/FLINK-17855] 正在跟进解决,针对 Dream-底限
打平array的具体场景,

本超提出参考 [
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins]
的”Expanding arrays into a relation“部分使用flink内置方法解决

http://apache-flink.147419.n8.nabble.com/flink1-11-tablefunction-td5229.html

[sql] junbaozhang 提出flink 1.11 executeSql查询kafka表print没有输出?

godfrey he进行了回答,1.11的 TableResult.collect() 和 TableResult.print() 方法在流模式下,

都是exactly once语义,需要配置checkpoint才能得到结果。

http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-kafka-print-td5367.html#a5370
活动博客文章及其他

共享很重要 —— Flink SQL 中的 catalogs

https://flink.apache.org/2020/07/23/catalogs.html

Flink 1.11 SQL 使用攻略

https://mp.weixin.qq.com/s/BBRw3sR323d-jaxxONYknQ

高能预警!Apache Flink Meetup · 上海站返场啦

https://mp.weixin.qq.com/s/2k4os3FakPde8IGPtSvglA

你与30W奖金只差一个 Apache Flink 极客挑战赛的报名

https://mp.weixin.qq.com/s/IW6VKWVTrzO1lTDZxJfPXQ


Re: flink on yarn 读取 hbase数据时 ,Task失败,具体描述如下

2020-07-29 文章 Leonard Xu
Hi,张锴

这个描述看起来没有用的信息呢,既然有任务失败,失败的日志和异常信息可以贴出来看看。或者贴一个可以复现这个失败的case.

> 在 2020年7月29日,17:02,张锴  写道:
> 
> flink获取Hbase数据并计算
> 在本地测试没问题,提交到Yarn上出现Task任务失败,无相关日志输出,task任务一直重启。任务失败的地方在数据计算部分。
> 语言:Scala,无堆栈信息输出

Best
Leonard

Re: 使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

2020-07-29 文章 Leonard Xu
Hi

> 
>bsTableEnv.executeSql("SELECT f_random, count(1) " +
>"FROM datagen " +
>"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

TableResult.print() 方法目前只支持了 exactly-once 语义,在 streaming 
模式下必须设置checkpoint才能work,
你配置下checkpoint之后再试下,支持 At Least Once 的方法在1.12里应该会支持,支持后可以不用设置 checkpoint。

祝好
Leonard



Re: flink-1.11 hive-1.2.1 ddl 无法写入数据

2020-07-29 文章 Leonard Xu
Hi, kcz

看connector的properties还是1.10的格式,你换成1.11试试[1].


> 在 2020年7月29日,15:07,kcz <573693...@qq.com> 写道:
> 
>  tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>"\thost STRING,\n" +
>"\turl STRING,\n" +
>"\tpublic_date STRING\n" +
>") WITH (\n" +
>"\t'connector.type' = 'kafka',\n" +
>"\t'connector.version' = 'universal',\n" +
>"\t'connector.startup-mode' = 'latest-offset',\n" +
>"\t'connector.topic' = 'sendMessage',\n" +
>"\t'connector.properties.group.id 
> ' = 'domain_testGroup',\n" +
>"\t'connector.properties.zookeeper.connect' = 
> '127.0.0.1:2181',\n" +
>"\t'connector.properties.bootstrap.servers' = 
> '127.0.0.1:9092',\n" +
>"\t'update-mode' = 'append',\n" +
>"\t'format.type' = 'json',\n" +
>"\t'format.derive-schema' = 'true'\n" +
>")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
 


Re:flink-1.11 hive-1.2.1 ddl 无法写入数据

2020-07-29 文章 hailongwang



有什么异常信息吗




在 2020-07-29 14:07:26,"kcz" <573693...@qq.com> 写道:
>确认数据源有数据,全部代码如下,但是hive就是没有数据
>
>package com.hive;
>
>import org.apache.flink.runtime.state.filesystem.FsStateBackend;
>import org.apache.flink.streaming.api.CheckpointingMode;
>import org.apache.flink.streaming.api.TimeCharacteristic;
>import 
>org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.SqlDialect;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.catalog.hive.HiveCatalog;
>
>import java.time.Duration;
>
>public class HiveTest {
>private static final String path = "hdfs_path";
>public static void main(String []args)  {
>System.setProperty("HADOOP_USER_NAME", "work");
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>// 同一时间只允许进行一个检查点
>env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>env.setStateBackend(new FsStateBackend(path));
>EnvironmentSettings tableEnvSettings = 
> EnvironmentSettings.newInstance()
>.useBlinkPlanner()
>.inStreamingMode()
>.build();
>StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env,tableEnvSettings);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>  CheckpointingMode.EXACTLY_ONCE);
>
> tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>  Duration.ofSeconds(20));
>
>String name= "myhive";
>String defaultDatabase = "situation";
>String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
>String version = "1.2.1";
>
>HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
> version);
>tableEnv.registerCatalog("myhive", hive);
>
>// set the HiveCatalog as the current catalog of the session
>tableEnv.useCatalog("myhive");
>tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
>tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");
>
>
>tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>"\thost STRING,\n" +
>"\turl STRING,\n" +
>"\tpublic_date STRING\n" +
>") WITH (\n" +
>"\t'connector.type' = 'kafka',\n" +
>"\t'connector.version' = 'universal',\n" +
>"\t'connector.startup-mode' = 'latest-offset',\n" +
>"\t'connector.topic' = 'sendMessage',\n" +
>"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
>"\t'connector.properties.zookeeper.connect' = 
> '127.0.0.1:2181',\n" +
>"\t'connector.properties.bootstrap.servers' = 
> '127.0.0.1:9092',\n" +
>"\t'update-mode' = 'append',\n" +
>"\t'format.type' = 'json',\n" +
>"\t'format.derive-schema' = 'true'\n" +
>")");
>
>tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");
>
>String hiveSql = "\n" +
>"  CREATE TABLE situation.fs_table (\n" +
>" \n" +
>"host STRING,\n" +
>"url STRING,\n" +
>"public_date STRING\n" +
>"  \n" +
>"  ) PARTITIONED BY (\n" +
>"ts_date STRING,\n" +
>"ts_hour STRING,\n" +
>"ts_minute STRING\n" +
>"  ) STORED AS PARQUET\n" +
>"  TBLPROPERTIES (\n" +
>"'sink.partition-commit.trigger' = 'process time',\n" +
>"'sink.partition-commit.delay' = '1 min',\n" +
>"'sink.partition-commit.policy.kind' = 
> 'metastore,success-file',\n" +
>"'partition.time-extractor.timestamp-pattern' = '$ts_date 
> $ts_hour:$ts_minute:00'\n" +
>"  )\n" +
>"  ";
>tableEnv.executeSql(hiveSql);
>
>tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>
>tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, 
> url,public_date," +
>" DATE_FORMAT(public_date,'-MM-dd') 
> ,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM 
> situation.source_table");
>
>
>
>}
>}


Re:使用datagen connector生成无界数据,一秒时间窗口的聚合操作,一直没有数据输出打印

2020-07-29 文章 hailongwang
Hi Asahi Lee:
 我在 master 上的 flink-sql-client 模块中建了一个类,复制你的代码控制台是有输出的,你使用的版本是什么的?
Best,
Hailong Wang




在 2020-07-29 15:35:30,"Asahi Lee" <978466...@qq.com> 写道:
>以下程序运行,控制台一直没有数据输出1. 程序package kafka;
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
>public class DataGenTest {
>
>public static void main(String[] args) {
>
>StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv = 
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
>String sourceTableDDL = "CREATE TABLE datagen ( " +
>" f_random INT, " +
>" f_random_str STRING, " +
>" ts AS localtimestamp, " +
>" WATERMARK FOR ts AS ts " +
>") WITH ( " +
>" 'connector' = 'datagen', " +
>" 'rows-per-second'='20', " +
>" 'fields.f_random.min'='1', " +
>" 'fields.f_random.max'='10', " +
>" 'fields.f_random_str.length'='10' " +
>")";
>
>bsTableEnv.executeSql(sourceTableDDL);
>
>bsTableEnv.executeSql("SELECT f_random, count(1) " +
>"FROM datagen " +
>"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();
>
>}
>
>}2. 控制台,log4j:WARN No appenders could be found for logger 
>(org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize 
>the log4j system properly. log4j:WARN See 
>http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
>+-+--+ |f_random |   EXPR$1 | 
>+-+--+


flink1.11 sql 发布到yarn session时找不到hbase相关的类

2020-07-29 文章 wind.fly....@outlook.com
Hi,all:
最近在升级flink1.11,sql中用到hbase connctor,发布到yarn-session时,报如下异常:
2020-07-29 11:49:55
org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoClassDefFoundError: 
Could not initialize class org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:248)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:221)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:388)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:362)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:142)
at 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1053)
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:496)
at 
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:402)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:274)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
... 7 more

注意到官方文档有Note:
Note: To use HBase connector in SQL Client or Flink cluster, it’s highly 
recommended to add HBase dependency jars to Hadoop classpath. Flink will load 
all jars under Hadoop classpath automatically, please refer to HBase, 
MapReduce, and the 
CLASSPATH about 
how to add HBase dependency jars to Hadoop classpath.
  但是在yarn session下怎么设置classpath呢


flink1.9.1 在WebUI中查看committedOffsets指标为负值

2020-07-29 文章 bradyMk
flink1.9.1
在WebUI中查看Source__Custom_Source.KafkaConsumer.topic.geek-event-target.partition.3.committedOffsets指标为负值,查看官网释义:对于每个分区,最后一次成功提交到Kafka的偏移量。
但我这里为什么是负值呢?
 

希望能得到指导,万分感谢~



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink on yarn 读取 hbase数据时 ,Task失败,具体描述如下

2020-07-29 文章 张锴
flink获取Hbase数据并计算
在本地测试没问题,提交到Yarn上出现Task任务失败,无相关日志输出,task任务一直重启。任务失败的地方在数据计算部分。
语言:Scala,无堆栈信息输出


关于window过程中barrier的问题

2020-07-29 文章 shuwen zhou
大家好,想请教一个关于barrier的问题
如果我有如下算子
.window()
.reduce()
假设barrier和元素顺序是
tuple 和 barrier
当tuple流经窗口期,barrier是会阻塞等待tuple从窗口流出之后再达到下一个算子,还是不在窗口阻塞,直接流到下一个算子呢?



Best Wishes,
Shuwen Zhou


回复: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-29 文章 hk__lrzy
你是说emit之后的offset commit么?可以看下
`Kafka09Fetcher`的runFetchLoop方法


在2020年07月29日 15:09,shuwen zhou 写道:
比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?

On Wed, 29 Jul 2020 at 14:51, venn  wrote:

checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法:
notifyCheckpointComplete

-Original Message-
From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
 On Behalf Of
shuwen zhou
Sent: 2020年7月29日 14:24
To: user-zh@flink.apache.org
Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制

大家好,请教一个问题,

当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?

另外有大神指路这段代码具体在哪个文件吗?
谢谢!

--
Best Wishes,
Shuwen Zhou



--
Best Wishes,
Shuwen Zhou 


近期使用flinksql(1.11.0)发现的一些问题

2020-07-29 文章 Ryiyi
1. create table语句不支持create talbe if not exists:
不支持if not exists语法在实际使用时特别麻烦,每次重新执行SQL都需要先删除上次执行创建的table。
Q1: CREATE TABLE IF NOT EXITS语法个人理解实现并不特别麻烦,社区为什么还没实现?
2. flink1.11创建kafka sink表时不再支持update-mode属性:
创建kafka 
sink表时报不支持udpate-mode属性的语法检验错误。但查看flink1.11源码中仍存在多个测试类在使用update-mode属性。
Q2:从flink1.11开始,kafka数据源的建表语句是否明确不再支持update-mode?(官网示例中已删除,但源码测试类中仍存在)
3. update-mode属性校验失败,任务推出,但该表却已经在catalog中生成
Q3:建表语句的属性校验失败,任务异常退出,但该表却已经在catalog中生成。感觉不太合理


麻烦了解的大佬抽空解答下,多谢!

RE: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-29 文章 venn
可以这样理解,实际上souce 算子只知道这条数据发出去了,不知道这条数据执行到哪里的

-Original Message-
From: user-zh-return-5981-wxchunjhyy=163@flink.apache.org 
 On Behalf Of shuwen 
zhou
Sent: 2020年7月29日 15:10
To: user-zh@flink.apache.org
Subject: Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制

比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?

On Wed, 29 Jul 2020 at 14:51, venn  wrote:

> checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法:
> notifyCheckpointComplete
>
> -Original Message-
> From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
>  On Behalf Of 
> shuwen zhou
> Sent: 2020年7月29日 14:24
> To: user-zh@flink.apache.org
> Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制
>
> 大家好,请教一个问题,
>
> 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条
> offset提交吗?
> 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
>
> 另外有大神指路这段代码具体在哪个文件吗?
> 谢谢!
>
> --
> Best Wishes,
> Shuwen Zhou
>


--
Best Wishes,
Shuwen Zhou 


????datagen connector??????????????????????????????????????????????????????????

2020-07-29 文章 Asahi Lee
1. package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class DataGenTest {

public static void main(String[] args) {

StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

String sourceTableDDL = "CREATE TABLE datagen ( " +
" f_random INT, " +
" f_random_str STRING, " +
" ts AS localtimestamp, " +
" WATERMARK FOR ts AS ts " +
") WITH ( " +
" 'connector' = 'datagen', " +
" 'rows-per-second'='20', " +
" 'fields.f_random.min'='1', " +
" 'fields.f_random.max'='10', " +
" 'fields.f_random_str.length'='10' " +
")";

bsTableEnv.executeSql(sourceTableDDL);

bsTableEnv.executeSql("SELECT f_random, count(1) " +
"FROM datagen " +
"GROUP BY TUMBLE(ts, INTERVAL '1' second), f_random").print();

}

}2. log4j:WARN No appenders could be found for logger 
(org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the 
log4j system properly. log4j:WARN See 
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 
+-+--+ |f_random |   EXPR$1 | 
+-+--+

Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制

2020-07-29 文章 shuwen zhou
比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?

On Wed, 29 Jul 2020 at 14:51, venn  wrote:

> checkpoint成功时就会把它的offset提交,可以看下这个类:  FlinkKafkaConsumerBase  的 这个方法:
> notifyCheckpointComplete
>
> -Original Message-
> From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
>  On Behalf Of
> shuwen zhou
> Sent: 2020年7月29日 14:24
> To: user-zh@flink.apache.org
> Subject: Flink使用Kafka作为source时checkpoint成功提交offset的机制
>
> 大家好,请教一个问题,
>
> 当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
> 还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
>
> 另外有大神指路这段代码具体在哪个文件吗?
> 谢谢!
>
> --
> Best Wishes,
> Shuwen Zhou
>


-- 
Best Wishes,
Shuwen Zhou 


Re: flink 1.11 rest api saveppoint接口 异常

2020-07-29 文章 taowang
好哒,我的自动更新逻辑依赖了这个 api,不过现在我用其他方式先处理了。
感谢相应,祝好~


 原始邮件 
发件人: Congxian Qiu
收件人: user-zh
发送时间: 2020年7月29日(周三) 13:34
主题: Re: flink 1.11 rest api saveppoint接口 异常


Hi 创建了一个 Issue[1] 来跟进这个问题 [1] https://issues.apache.org/jira/browse/FLINK-18748 
Best, Congxian Congxian Qiu  于2020年7月29日周三 下午1:24写道: > 
Hi taowang > 感谢你的更新,这个地方应该是 savepoint trigger 的逻辑有问题,现在确实 > 
setMinPauseBetweenCheckpoints 会影响 savepoint,我创建一个 issue 来跟进一下这个问题 > > Best, > 
Congxian > > > taowang  于2020年7月29日周三 下午12:29写道: > >> 
我再次确认了一下,可能是因为我设置了checkpoint的setMinPauseBetweenCheckpoints,所以在上一次 >> checkpoint 
和这个间隔之间触发 savepoint 不会生效,但是接口返回了IN_PROGRESS 的状态,我觉得这里应该是有点问题的。 >> >> >> 原始邮件 >> 
发件人: taowang >> 收件人: user-zh 
>> 发送时间: 2020年7月28日(周二) 18:53 >> 主题: Re: flink 1.11 rest api saveppoint接口 异常 >> 
>> >> 是的,其实无论是否开启了unaligned checkpoint,我在调用这个接口的时候都没有 checkpoint 在做。 >> 
而且等待的话,我认为如果有正在做的,那么正在做的 checkpoint执行完成之后新的 savepoint >> 
应该会开始执行吧,但我看到的现象是等了半个小时依旧是 IN_PROGRESS状态,正常状态下,一个 checkpoint 的执行时间也就几秒钟,正常的 >> 
savpoint 执行完成最多也只需要几分钟。 原始邮件 发件人: Congxian Qiu >> 收件人: 
user-zh 发送时间: 2020年7月28日(周二) 18:09 主题: >> Re: flink 
1.11 rest api saveppoint接口 异常 Hi 开启 unalign checkpoint 的情况下,如果有 >> checkpoint 
正在做的话,那么 savepoint 会等待的[1],但是把 unaligned checkpoint >> 
关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗? [1] >> 
https://issues.apache.org/jira/browse/FLINK-17342 Best, Congxian taowang >> 
 于2020年7月28日周二 下午5:05写道: > 在升级了 flink >> 
1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: > 在 flink >> 1.10 
时:当请求该接口后,在 flink ui 可以看到 savepoint > >> 被触发,/jobs/:jobid/savepoints/:triggerid 
返回IN_PROGRESS,等 savepoint > >> 
成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 > 但是在flink >> 
1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints > >> 
接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 > >> 
savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 > >> 
我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 > >> 
flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
 >> > > > rest api flink docs 链接: > >> 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
 >> > > > 祝好~ > >

flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
hive

package com.hive;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class HiveTest {
private static final String path = "hdfs_path";
public static void main(String []args)  {
System.setProperty("HADOOP_USER_NAME", "work");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20));

String name= "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");


tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'sendMessage',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = 
'127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = 
'127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");

String hiveSql = "\n" +
"  CREATE TABLE situation.fs_table (\n" +
" \n" +
"host STRING,\n" +
"url STRING,\n" +
"public_date STRING\n" +
"  \n" +
"  ) PARTITIONED BY (\n" +
"ts_date STRING,\n" +
"ts_hour STRING,\n" +
"ts_minute STRING\n" +
"  ) STORED AS PARQUET\n" +
"  TBLPROPERTIES (\n" +
"'sink.partition-commit.trigger' = 'process time',\n" +
"'sink.partition-commit.delay' = '1 min',\n" +
"'sink.partition-commit.policy.kind' = 
'metastore,success-file',\n" +
"'partition.time-extractor.timestamp-pattern' = '$ts_date 
$ts_hour:$ts_minute:00'\n" +
"  )\n" +
"  ";
tableEnv.executeSql(hiveSql);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, 
url,public_date," +
" DATE_FORMAT(public_date,'-MM-dd') 
,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM 
situation.source_table");



}
}