Re: 请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-19 文章 Chenyu Zheng
History Server的API也是使用jobid作为区分

  *   /config
  *   /jobs/overview
  *   /jobs/
  *   /jobs//vertices
  *   /jobs//config
  *   /jobs//exceptions
  *   /jobs//accumulators
  *   /jobs//vertices/
  *   /jobs//vertices//subtasktimes
  *   /jobs//vertices//taskmanagers
  *   /jobs//vertices//accumulators
  *   /jobs//vertices//subtasks/accumulators
  *   /jobs//vertices//subtasks/
  *   /jobs//vertices//subtasks//attempts/
  *   
/jobs//vertices//subtasks//attempts//accumulators
  *   /jobs//plan


From: Chenyu Zheng 
Reply-To: "user-zh@flink.apache.org" 
Date: Friday, August 20, 2021 at 11:43 AM
To: "user-zh@flink.apache.org" 
Subject: 请问同一个flink history server能够支持多个flink application cluster吗?

您好,

我们目前在k8s上以flink application模式运行作业,现在希望部署一个history server方便debug。但是根据文档,flink 
historyserver貌似只支持单个cluster下不同job的使用方法,如果存在多个cluster,相同的jobID将会出现错误。

请问对于多个application cluster,history使用的最佳姿势是什么样的?

谢谢[cid:image001.png@01D795B8.6430A670]


请问同一个flink history server能够支持多个flink application cluster吗?

2021-08-19 文章 Chenyu Zheng
您好,

我们目前在k8s上以flink application模式运行作业,现在希望部署一个history server方便debug。但是根据文档,flink 
historyserver貌似只支持单个cluster下不同job的使用方法,如果存在多个cluster,相同的jobID将会出现错误。

请问对于多个application cluster,history使用的最佳姿势是什么样的?

谢谢[cid:image001.png@01D795B8.6430A670]


Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 李航飞
你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
   return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng"  写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞  于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞"  写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞  于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>


退订

2021-08-19 文章 Bruce Zhang
邮件退订

Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 Caizhi Weng
Hi!

之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
确实应该每分钟收到一条消息。

sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。

李航飞  于2021年8月20日周五 上午10:03写道:

> 你好:
> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>
>
>
> 在 2021-08-20 09:10:44,"李航飞"  写道:
> >你好:
> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
> >
> >
> >我在RichMapFunction接口里面实现open方法
> >设置过StateTtlConfig;
> >之后在RedisConmmand.SETEX设置过期时间
> >都注释了,但upsert()方法还是没效
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
> >>Hi!
> >>
> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
> >>
> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
> >>
> >>李航飞  于2021年8月19日周四 下午5:03写道:
> >>
> >>> 版本 flink1.13.2
> >>> 具体场景
> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
> >>>
> >>>
> >>> 问题:
> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
> >>>
> >>>
>


Re:Re: Re: cumulate函数和比较函数连用报错

2021-08-19 文章 李航飞
你好:
具体场景是对agg结果之前进行过滤,现在通过create view进行提取过滤了
现在我想通过DynameicTable的方式,以upsert写入redis里面




在 2021-08-20 10:31:18,"Caizhi Weng"  写道:
>Hi!
>
>具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。
>
>李航飞  于2021年8月18日周三 下午4:34写道:
>
>> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
>> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
>> >Hi!
>> >
>> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
>> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
>> >
>> >李航飞  于2021年8月18日周三 下午3:55写道:
>> >
>> >> 通过flinksql建立数据处理通道
>> >> SELECT window_start,window_end,SUM(price)
>> >>
>> >> FROM TABLE(
>> >>
>> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
>> '10'
>> >> MINUTES))
>> >>
>> >> GROUP BY window_start,window_end;
>> >>
>> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
>> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
>> >> java.lang.UnsupportedOperationException:
>> >> Currently Flink doesn't support individual window table-valued function
>> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
>> >>  Please use window table-valued function with aggregate together using
>> >> window_start and window_end as group keys.
>> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
>> >>
>> >>
>>


Re: Re: cumulate函数和比较函数连用报错

2021-08-19 文章 Caizhi Weng
Hi!

具体的条件是什么样的呢?理论上如果是和原本的 group by ... having ... 等价的语句应该是支持的。

李航飞  于2021年8月18日周三 下午4:34写道:

> 哦哦,好的,谢谢你。比较函数不可以连用,简单的一些的条件(where a = '2')可以用,这个功能后续还会调整吗
> 在 2021-08-18 16:21:20,"Caizhi Weng"  写道:
> >Hi!
> >
> >目前 window tvf 只能应用于 window agg 和 window top-n 两种场景。如果 where 条件是用来对 window
> >agg 的结果进行过滤的,可以使用 having 而不是 where;若是对进入 window 之前的数据进行过滤的,可以 create view。
> >
> >李航飞  于2021年8月18日周三 下午3:55写道:
> >
> >> 通过flinksql建立数据处理通道
> >> SELECT window_start,window_end,SUM(price)
> >>
> >> FROM TABLE(
> >>
> >> CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '2' MINUTES,INTERVAL
> '10'
> >> MINUTES))
> >>
> >> GROUP BY window_start,window_end;
> >>
> >> 大致语句如上,该语句通过 StreamTableEnvironment对象 env.sqlQuery(sql)执行成功,没有问题
> >> 关键一步是 StatementSet对象 sta.execute() 执行报错
> >> java.lang.UnsupportedOperationException:
> >> Currently Flink doesn't support individual window table-valued function
> >> CUMULATE(time_col=[ts], max_size=[10 min], step=[2 min]).
> >>  Please use window table-valued function with aggregate together using
> >> window_start and window_end as group keys.
> >> 执行环境是flink1.13.1  去掉where条件可以正常执行,加上就不行。
> >>
> >>
>


Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 文章 changfeng
你好
  感谢解答,我仔细看了下Flink Table API & SQL Data Types页面: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/
 

 , 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#data-types-in-the-table-api
 

 这一节末尾的表格是否就是Flink当前已经支持的所有数据类型呢?


> 2021年8月19日 下午9:10,Leonard Xu  写道:
> 
> Hello,
> 
> Flink 还不支持 TIMESTAMP WITH TIME ZONE 类型,
> 
> 目前支持的有: 
> TIMESTAMP WITHOUT TIME ZONE, 缩写为 TIMESTAMP
> TIMESTAMP WITH LOCAL TIME ZONE,缩写为TIMESTAMP_LTZ
> 
> 祝好,
> Leonard
> 
>> 在 2021年8月19日,20:51,changfeng  写道:
>> 
>> ` TIMESTAMP(6) WITH TIME ZONE
> 



Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 李航飞
你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞"  写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞  于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>


Re:Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 李航飞
你好:
我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。


我在RichMapFunction接口里面实现open方法
设置过StateTtlConfig;
之后在RedisConmmand.SETEX设置过期时间
都注释了,但upsert()方法还是没效














在 2021-08-19 17:44:02,"Caizhi Weng"  写道:
>Hi!
>
>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>
>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>
>李航飞  于2021年8月19日周四 下午5:03写道:
>
>> 版本 flink1.13.2
>> 具体场景
>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>
>>
>> 问题:
>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>
>>


Re: Process suspend when get Hana connection in open method of sink function

2021-08-19 文章 Chesnay Schepler
If the Hana driver cannot be loaded then the most likely reason is that 
the dependency is not actually on the classpath.

Please double-check that your user jar bundles the dependency.

On 18/08/2021 15:05, Chenzhiyuan(HR) wrote:


Dear all:

I have a problem when I want to sink data to Hana database.

Process is suspended when get Hana connection in the open method of 
sink function as below.


My flink version is 1.10.

public class HrrmPayValueSumToHana extends 
RichSinkFunction  {


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = HrrmUtils./getHanaConnection/();    // process is 
suspended here

}

@Override public void invoke() {
  …….
}
@Override public void close() throws Exception {
  ……….
}

}

public static Connection getHanaConnection() { Connection con = null; 
try { Class./forName/(HrrmConstants./HANA_DRIVER_CLASS/); con = 
DriverManager./getConnection/(HrrmConstants./HANA_SOURCE_DRIVER_URL/, 
    HrrmConstants./HANA_SOURCE_USER/, 
HrrmConstants./HANA_SOURCE_PASSWORD/);    } catch (Exception e) { /LOG/.error("---hana get connection has exception , msg = ", e); 
    } return con; }


Hana driver dependency as below:

     com.sap.cloud.db.jdbc     
ngdbc     2.3.62 






Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 文章 Leonard Xu
Hello,

Flink 还不支持 TIMESTAMP WITH TIME ZONE 类型,

目前支持的有: 
TIMESTAMP WITHOUT TIME ZONE, 缩写为 TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE,缩写为TIMESTAMP_LTZ

祝好,
Leonard

> 在 2021年8月19日,20:51,changfeng  写道:
> 
> ` TIMESTAMP(6) WITH TIME ZONE



Re: Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 文章 Caizhi Weng
Hi!

目前 timestamp 相关类型只支持普通的 timestamp 还有 timestamp with local time zone
这两种。如果是有把 Fri Mar 26 12:27:05 IST 2021 这种带时区的 string 转成 timestamp 的需求,建议使用
date_format 函数。

changfeng  于2021年8月19日周四 下午8:52写道:

> 你好, 我最近在使用Flink 1.13.1版本的SQL Api时,遇到了不支持TIMESTAMP(p) WITH TIME ZONE
> 类型数据的问题:
>  使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH TIME ZONE)
> WITH ('connector' = 'print’) 创建表,报错:
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "TIME" at line 1, column 55.
> Was expecting:
> "LOCAL" ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
> at
> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
> at
> org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
> at
> org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
> ... 30 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "TIME" at line 1, column 55.
> Was expecting:
> "LOCAL" ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TimeZoneOpt(FlinkSqlParserImpl.java:25946)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.DateTimeTypeName(FlinkSqlParserImpl.java:25892)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlTypeName(FlinkSqlParserImpl.java:25168)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypeName(FlinkSqlParserImpl.java:24787)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ExtendedDataType(FlinkSqlParserImpl.java:4990)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypedColumn(FlinkSqlParserImpl.java:4866)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4491)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5197)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
> at
> org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
> ... 32 more
>
> 而使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH LOCAL TIME
> ZONE) WITH ('connector' = 'print’) 创建表则不会有问题,请问Flink SQL
> Api当前是否支持TIMESTAMP(p) WITH TIME ZONE 类型数据或者有相关Bug。
>
>


Flink SQL Api不支持TIMESTAMP(p) WITH TIME ZONE 类型的列

2021-08-19 文章 changfeng
你好, 我最近在使用Flink 1.13.1版本的SQL Api时,遇到了不支持TIMESTAMP(p) WITH TIME ZONE 类型数据的问题:
 使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH TIME ZONE) WITH 
('connector' = 'print’) 创建表,报错:
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "TIME" 
at line 1, column 55.
Was expecting:
"LOCAL" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:450)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:213)
at 
org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at 
org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at 
org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
... 30 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" 
at line 1, column 55.
Was expecting:
"LOCAL" ...

at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39782)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39593)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TimeZoneOpt(FlinkSqlParserImpl.java:25946)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.DateTimeTypeName(FlinkSqlParserImpl.java:25892)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlTypeName(FlinkSqlParserImpl.java:25168)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypeName(FlinkSqlParserImpl.java:24787)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ExtendedDataType(FlinkSqlParserImpl.java:4990)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TypedColumn(FlinkSqlParserImpl.java:4866)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.TableColumn(FlinkSqlParserImpl.java:4491)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5197)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6233)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20934)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3415)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3918)
at 
org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:261)
at 
org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 32 more

而使用SQL: CREATE TABLE source ( `ctimestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE) 
WITH ('connector' = 'print’) 创建表则不会有问题,请问Flink SQL Api当前是否支持TIMESTAMP(p) WITH 
TIME ZONE 类型数据或者有相关Bug。



Re: 请问如何从源码构建flink docker镜像?

2021-08-19 文章 Caizhi Weng
Hi!

如果只是改了 java 代码,可以先把 flink 编译出来,然后基于最新的镜像,把你编译出来的 flink(位于
flink-dist/target/flink-)的 lib 目录里的所有 jar COPY 到镜像的 lib 目录即可。

Chenyu Zheng  于2021年8月19日周四 下午7:38写道:

> Hi,
>
> 我最近对于手头的源码进行了些许修改,请问如何从源码构建docker镜像?这将方便我进行下一步测试
>
> 谢谢
>


请问如何从源码构建flink docker镜像?

2021-08-19 文章 Chenyu Zheng
Hi,

我最近对于手头的源码进行了些许修改,请问如何从源码构建docker镜像?这将方便我进行下一步测试

谢谢


Re: flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 Caizhi Weng
Hi!

可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。

如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。

李航飞  于2021年8月19日周四 下午5:03写道:

> 版本 flink1.13.2
> 具体场景
> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>
>
> 问题:
> 测试发现,每1分钟都会输出一次,落地的数据一样,
> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>
>


flink-connector-redis连接器upsert()模式插入问题

2021-08-19 文章 李航飞
版本 flink1.13.2
具体场景
flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次


问题:
测试发现,每1分钟都会输出一次,落地的数据一样,
根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?



关于InfluxDB中大量监控指标的count为0

2021-08-19 文章 Camile Sing
Hi!
我最近在为Flink集群搭建监控,使用了InfluxDB+Grafana。在关注每秒同步行数以及每秒同步数据大小时,我使用了以下相关指标:

taskmanager_job_task_numRecordsIn、taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond


 让我感到的奇怪的是,当我去查询InfluxDB时,指标的count都为空

> select * from
> taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond where
> count != 0;
> > select * from taskmanager_job_task_numRecordsIn where count != 0;

  所有指标的count都是空的

 > select count(*) from taskmanager_job_task_numRecordsIn where count = 0;

name: taskmanager_job_task_numRecordsIn

time count_count

 ---

0325289

> select count(*) from
> taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond where
> count = 0;

name: taskmanager_job_task_Shuffle_Netty_Input_numBytesInLocalPerSecond

time count_count count_rate

 --- --

0325328  325328

   请问这是正常现象吗?如果这是正常的,我该如何在界面上展示我的同步行数/s以及同步大小/s?因为现在展示出来都是一条线,即为0
[image: image.png]
[image: image.png]