[jira] [Updated] (FLINK-33657) Insert message in top n without row number didn't consider it's number and may incorrect

2023-11-27 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-33657:

Summary: Insert message in top n without row number didn't consider it's 
number and may incorrect  (was: Insert message in top n without row number 
didn't consider it's number and may not correct)

> Insert message in top n without row number didn't consider it's number and 
> may incorrect
> 
>
> Key: FLINK-33657
> URL: https://issues.apache.org/jira/browse/FLINK-33657
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.2, 1.17.1
>Reporter: zlzhang0122
>Priority: Major
>
> The new insert message in top n without row number didn't consider it's order 
> and just 
> collectInsert() to the next operator, this may incorrect when the next 
> operator collect all the top n records and aggregate it.
>  
> For example:
> create table user_info(
> user_id int,
> item_id int,
> app string,
> dt timestamp
> ) whith(
> 'connector'='kafka',
> ...
> );
> create table redis_sink (
> redis_key string,
> hash_key string,
> hash_value string
> )
> with (
> 'connector' = 'redis',
> 'command' = 'hmset'
> 'nodes' = 'xxx',
> 'additional-ttl' = 'xx'
> );
> create view base_lastn
> as select * from(
> select user_id,item_id,app,dt,row_number() over(partition by item_id, app 
> order by dt desc) as rn from user_action
> )t where rn<=5;
> insert into redis_sink
> select
> concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value 
> from base_lastn where rn=1;
> insert into redis_sink
> select
> concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") 
> as hash_value from base_lastn where group by item_id, app;
>  
> There will be a scene that the value in the top 1 will not appear in the 
> first or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33657) Insert message in top n without row number didn't consider it's number and may not correct

2023-11-27 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-33657:

Description: 
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert() to the next operator, this may not correct when the next 
operator collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.

  was:
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert to the next operator, this may not correct when the next operator 
collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.


> Insert message in top n without row number didn't consider it's number and 
> may not correct
> --
>
> Key: FLINK-33657
> URL: https://issues.apache.org/jira/browse/FLINK-33657
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.2, 1.17.1
>Reporter: zlzhang0122
>Priority: Major
>
> The new insert message in top n without row number didn't consider it's order 
> and just 
> collectInsert() to the next operator, this may not correct when the next 
> operator collect all the top n records and aggregate it.
>  
> For example:
> create table user_info(
> user_id int,
> item_id int,
> app string,
> dt timestamp
> ) whith(
> 'connector'='kafka',
> ...
> );
> create table redis_sink (
> redis_key string,
> hash_key string,
> hash_value string
> )
> with (
> 'connector' = 'redis',
> 'command' = 'hmset'
> 'nodes' = 'xxx',
> 'additional-ttl' = 'xx'
> );
> create view base_lastn
> as select * from(
> select user_id,item_id,app,dt,row_number() over(partition by item_id, app 
> order by dt desc) as rn from user_action
> )t where rn<=5;
> insert into redis_sink
> select
> concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value 
> from base_lastn where rn=1;
> insert into redis_sink
> select
> concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") 
> as hash_value from base_lastn where group by item_id, app;
>  
> There will be a scene that the value in the top 1 will not appear in the 
> first or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33657) Insert message in top n without row number didn't consider it's number and may not correct

2023-11-27 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-33657:

Description: 
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert() to the next operator, this may incorrect when the next operator 
collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.

  was:
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert() to the next operator, this may not correct when the next 
operator collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.


> Insert message in top n without row number didn't consider it's number and 
> may not correct
> --
>
> Key: FLINK-33657
> URL: https://issues.apache.org/jira/browse/FLINK-33657
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.2, 1.17.1
>Reporter: zlzhang0122
>Priority: Major
>
> The new insert message in top n without row number didn't consider it's order 
> and just 
> collectInsert() to the next operator, this may incorrect when the next 
> operator collect all the top n records and aggregate it.
>  
> For example:
> create table user_info(
> user_id int,
> item_id int,
> app string,
> dt timestamp
> ) whith(
> 'connector'='kafka',
> ...
> );
> create table redis_sink (
> redis_key string,
> hash_key string,
> hash_value string
> )
> with (
> 'connector' = 'redis',
> 'command' = 'hmset'
> 'nodes' = 'xxx',
> 'additional-ttl' = 'xx'
> );
> create view base_lastn
> as select * from(
> select user_id,item_id,app,dt,row_number() over(partition by item_id, app 
> order by dt desc) as rn from user_action
> )t where rn<=5;
> insert into redis_sink
> select
> concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value 
> from base_lastn where rn=1;
> insert into redis_sink
> select
> concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") 
> as hash_value from base_lastn where group by item_id, app;
>  
> There will be a scene that the value in the top 1 will not appear in the 
> first or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33657) Insert message in top n without row number didn't consider it's number and may not correct

2023-11-27 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-33657:
---

 Summary: Insert message in top n without row number didn't 
consider it's number and may not correct
 Key: FLINK-33657
 URL: https://issues.apache.org/jira/browse/FLINK-33657
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.17.1, 1.16.2
Reporter: zlzhang0122


The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert to the next operator, this may not correct when the next operator 
collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31495) Improve metrics tab on flink ui

2023-03-20 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702988#comment-17702988
 ] 

zlzhang0122 commented on FLINK-31495:
-

IMHO this is not a problem, if you mouseover the selection, you'll see the 
whole metric name

> Improve metrics tab on flink ui
> ---
>
> Key: FLINK-31495
> URL: https://issues.apache.org/jira/browse/FLINK-31495
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: zhang haoyan
>Priority: Major
> Attachments: image-2023-03-17-15-13-06-815.png
>
>
> When metric is too long, the select component can not display the whole 
> metric name
> !image-2023-03-17-15-13-06-815.png|width=289,height=154!
> We should use a modal or make the options horizontal scrollable



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31529) Let yarn client exit early before JobManager running

2023-03-20 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702617#comment-17702617
 ] 

zlzhang0122 commented on FLINK-31529:
-

IMHO I agree with this, and we have implement an option to deal with this.

> Let yarn client exit early before JobManager running
> 
>
> Key: FLINK-31529
> URL: https://issues.apache.org/jira/browse/FLINK-31529
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Reporter: Weihua Hu
>Priority: Major
>
> Currently the YarnClusterDescriptor always wait yarn application status to be 
> RUNNING even if we use the detach mode. 
> In batch mode, the queue resources is insufficient in most case. So the job 
> manager may take a long time to wait resources. And client also keep waiting 
> too. If flink client is killed(some other reason), the cluster will be 
> shutdown too.
> We need an option to let Flink client exit early. Use the detach option or 
> introduce a new option are both OK.
> Looking forward other suggestions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31196) Flink on YARN honors env.java.home

2023-02-23 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692966#comment-17692966
 ] 

zlzhang0122 commented on FLINK-31196:
-

Maybe duplicate with this? 
[Flink-22091|https://issues.apache.org/jira/browse/FLINK-22091]

> Flink on YARN honors env.java.home
> --
>
> Key: FLINK-31196
> URL: https://issues.apache.org/jira/browse/FLINK-31196
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.16.1
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink on YARN, honor env.java.home, and launch JobManager and TaskMananger 
> containers with a configured env.java.home.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-28283) Improving the log of flink when job start and deploy

2023-02-10 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686987#comment-17686987
 ] 

zlzhang0122 edited comment on FLINK-28283 at 2/10/23 9:45 AM:
--

[~xtsong] Right you are, actually what I think is cache theses logs for some 
short time and then aggregate and print it, but indeed this may cause some 
problem such as timeliness and lose of logs, so this is fine with me to close 
this ticket.

[~chesnay] Sure, we can hiding the log message via log4j filters.


was (Author: zlzhang0122):
[~xtsong] Right you are, actually what I think is cache theses logs for some 
short time, but indeed this may cause some problem such as timeliness and lose 
of logs, so this is fine with me to close this ticket.

[~chesnay] Sure, we can hiding the log message via log4j filters.

> Improving the log of flink when job start and deploy
> 
>
> Key: FLINK-28283
> URL: https://issues.apache.org/jira/browse/FLINK-28283
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>
> When running a large job with many operators and subtasks on flink, the 
> JobManager and TaskManager will have a huge logs about the subtask executing 
> msg such as "XXX switched from CREATED to SCHEDULED、XXX switched from 
> SCHEDULED to DEPLOYING 、XXX switched from DEPLOYING to RUNNING 、XXX switched 
> from RUNNING to CANCELING、XXX switched from CANCELING to CANCELED", etc. .
> Maybe we can do some improvement about this, such as aggregate these msg to 
> reduce the log, or change the log level and only logs the failure msg and 
> subtask, etc. Not so sure about the solution, but these msg is really too 
> much. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28283) Improving the log of flink when job start and deploy

2023-02-10 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686987#comment-17686987
 ] 

zlzhang0122 commented on FLINK-28283:
-

[~xtsong] Right you are, actually what I think is cache theses logs for some 
short time, but indeed this may cause some problem such as timeliness and lose 
of logs, so this is fine with me to close this ticket.

[~chesnay] Sure, we can hiding the log message via log4j filters.

> Improving the log of flink when job start and deploy
> 
>
> Key: FLINK-28283
> URL: https://issues.apache.org/jira/browse/FLINK-28283
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>
> When running a large job with many operators and subtasks on flink, the 
> JobManager and TaskManager will have a huge logs about the subtask executing 
> msg such as "XXX switched from CREATED to SCHEDULED、XXX switched from 
> SCHEDULED to DEPLOYING 、XXX switched from DEPLOYING to RUNNING 、XXX switched 
> from RUNNING to CANCELING、XXX switched from CANCELING to CANCELED", etc. .
> Maybe we can do some improvement about this, such as aggregate these msg to 
> reduce the log, or change the log level and only logs the failure msg and 
> subtask, etc. Not so sure about the solution, but these msg is really too 
> much. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28909) Add ribbon filter policy option in RocksDBConfiguredOptions

2022-08-15 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579671#comment-17579671
 ] 

zlzhang0122 commented on FLINK-28909:
-

[~Yanfei Lei] Thanks for your reply and yes, you are right. There is still a 
lot of works to do to support Ribbon filter policy.

> Add ribbon filter policy option in RocksDBConfiguredOptions
> ---
>
> Key: FLINK-28909
> URL: https://issues.apache.org/jira/browse/FLINK-28909
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.14.2, 1.15.1
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.16.0, 1.15.2
>
>
> Ribbon filter can efficiently enhance the read and reduce the disk and memory 
> usage on RocksDB, it's supported by rocksdb since 6.15. (more details see 
> [http://rocksdb.org/blog/2021/12/29/ribbon-filter.html|http://rocksdb.org/blog/2021/12/29/ribbon-filter.html])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28909) Add ribbon filter policy option in RocksDBConfiguredOptions

2022-08-10 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-28909:
---

 Summary: Add ribbon filter policy option in 
RocksDBConfiguredOptions
 Key: FLINK-28909
 URL: https://issues.apache.org/jira/browse/FLINK-28909
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.15.1, 1.14.2
Reporter: zlzhang0122
 Fix For: 1.16.0, 1.15.2


Ribbon filter can efficiently enhance the read and reduce the disk and memory 
usage on RocksDB, it's supported by rocksdb since 6.15. (more details see 
[http://rocksdb.org/blog/2021/12/29/ribbon-filter.html|http://rocksdb.org/blog/2021/12/29/ribbon-filter.html])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28598) ClusterEntryPoint can't get the real exit reason when shutting down

2022-07-18 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-28598:
---

 Summary: ClusterEntryPoint can't get the real exit reason when 
shutting down
 Key: FLINK-28598
 URL: https://issues.apache.org/jira/browse/FLINK-28598
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN, Runtime / Task
Affects Versions: 1.15.1, 1.14.2
Reporter: zlzhang0122


When the cluster is starting and some error occurs, the ClusterEntryPoint will 
shutDown the cluster asynchronous, but if it can't get a Throwable, the 
shutDown reason will be null, but actually if it's a user code problem and this 
may happen. 

I think we can get the real exit reason caused by user code and pass it to the 
diagnostics parameter, this may help users a lot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24306) group by index throw SqlValidatorException

2022-07-08 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17564148#comment-17564148
 ] 

zlzhang0122 commented on FLINK-24306:
-

[~mvis...@pivotal.io] Hi, could you help to confirm and review it?

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1, 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-24306) group by index throw SqlValidatorException

2022-07-06 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563173#comment-17563173
 ] 

zlzhang0122 commented on FLINK-24306:
-

Hi, [~jark]  I have found the reason and created a pr to fix it, can you help 
to confirm and review it?

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1, 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24306) group by index throw SqlValidatorException

2022-07-06 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24306:

Affects Version/s: 1.14.2

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1, 1.14.2
>Reporter: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28386) Trigger an immediate checkpoint after all sources finished

2022-07-05 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562941#comment-17562941
 ] 

zlzhang0122 commented on FLINK-28386:
-

For bounded job in this scene, IMO, maybe savepoint is better?Why should we use 
checkpoint instead of savepoint?

> Trigger an immediate checkpoint after all sources finished
> --
>
> Key: FLINK-28386
> URL: https://issues.apache.org/jira/browse/FLINK-28386
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing
>Reporter: Yun Gao
>Priority: Major
>
> Currently for bounded job in streaming mode, by default it will wait for one 
> more checkpoint to commit the last piece of data. If the checkpoint period is 
> long, the waiting time might also be long. to optimize this situation, we 
> could eagerly trigger a checkpoint after all sources are finished. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28283) Improving the log of flink when job start and deploy

2022-06-28 Thread zlzhang0122 (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 zlzhang0122 created an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Flink /  FLINK-28283  
 
 
  Improving the log of flink when job start and deploy   
 

  
 
 
 
 

 
Issue Type: 
  Improvement  
 
 
Affects Versions: 
 1.14.2  
 
 
Assignee: 
 Unassigned  
 
 
Components: 
 Runtime / Task  
 
 
Created: 
 28/Jun/22 10:17  
 
 
Priority: 
  Major  
 
 
Reporter: 
 zlzhang0122  
 

  
 
 
 
 

 
 When running a large job with many operators and subtasks on flink, the JobManager and TaskManager will have a huge logs about the subtask executing msg such as "XXX switched from CREATED to SCHEDULED、XXX switched from SCHEDULED to DEPLOYING 、XXX switched from DEPLOYING to RUNNING 、XXX switched from RUNNING to CANCELING、XXX switched from CANCELING to CANCELED", etc. . Maybe we can do some improvement about this, such as aggregate these msg to reduce the log, or change the log level and only logs the failure msg and subtask, etc. Not so sure about the solution, but these msg is really too much.   
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  

[jira] [Created] (FLINK-28197) Flink didn't deletes the YARN application files when the submit is failed in application mode

2022-06-22 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-28197:
---

 Summary: Flink didn't deletes the YARN application files when the 
submit is failed in application mode
 Key: FLINK-28197
 URL: https://issues.apache.org/jira/browse/FLINK-28197
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.14.2
Reporter: zlzhang0122
 Fix For: 1.16.0


When users submit a Flink job to YARN and the submit is failed in yarn 
Application Mode, the YARN application files such as Flink binaries, libraries, 
etc. won't be delete and will exists permanently unless users delete manually.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-18 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17538931#comment-17538931
 ] 

zlzhang0122 commented on FLINK-27608:
-

[~Thesharing] First, thanks for your quickly response and really detailed 
explanation. And yes, I agree with you, there is only one scenario here because 
it is a distributed environment. The reason why it takes such a long time to 
deploy the upstream tasks is the upstream tasks has a large state to restore. 
And sometimes this may be happen very frequently. So the problem comes back to 
the beginning that  the config of taskmanager.network.request-backoff.max is 
not easy to decide and can we have some better solution to deal with it? Thanks 
again!!

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537500#comment-17537500
 ] 

zlzhang0122 edited comment on FLINK-27608 at 5/16/22 12:01 PM:
---

[~Thesharing] Thanks for your detailed reply! I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload [^exception.txt]  to describe it. Correct me if I'm wrong. Thanks!


was (Author: zlzhang0122):
[~Thesharing] Thanks for your detailed reply! I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload [^exception.txt] about that to describe it.Correct me if I'm wrong. 
Thanks!

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537500#comment-17537500
 ] 

zlzhang0122 edited comment on FLINK-27608 at 5/16/22 11:59 AM:
---

[~Thesharing] Thanks for your detailed reply! I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload [^exception.txt] about that to describe it.Correct me if I'm wrong. 
Thanks!


was (Author: zlzhang0122):
[~Thesharing] Thanks for your detailed reply. I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload [^exception.txt] about that to describe it.Correct me if I'm wrong?

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537500#comment-17537500
 ] 

zlzhang0122 edited comment on FLINK-27608 at 5/16/22 11:58 AM:
---

[~Thesharing] Thanks for your detailed reply. I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload [^exception.txt] about that to describe it.Correct me if I'm wrong?


was (Author: zlzhang0122):
[~Thesharing] Thanks for your detailed reply. I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload a log about that to describe it.Correct me if I'm wrong?

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537500#comment-17537500
 ] 

zlzhang0122 commented on FLINK-27608:
-

[~Thesharing] Thanks for your detailed reply. I think the scenario you have 
mentioned is very useful and is one of the scenarios. The case I have met is 
another scenario, in that case, the akka message maybe miss or timeout, and I 
have upload a log about that to describe it.Correct me if I'm wrong?

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-27608:

Attachment: exception.txt

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Attachments: exception.txt
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-13 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-27608:

Component/s: Runtime / Network

> Flink may throw PartitionNotFound Exception if the downstream task reached 
> Running state earlier than it's upstream task
> 
>
> Key: FLINK-27608
> URL: https://issues.apache.org/jira/browse/FLINK-27608
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.14.2
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.16.0
>
>
> Flink streaming job deployment may throw PartitionNotFound Exception if the 
> downstream task reached Running state earlier than its upstream task and 
> after maximum backoff for partition requests passed.But the config of 
> taskmanager.network.request-backoff.max is not eay to decide. Can we use a 
> loop awaiting the upstream task partition be ready?
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27608) Flink may throw PartitionNotFound Exception if the downstream task reached Running state earlier than it's upstream task

2022-05-13 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-27608:
---

 Summary: Flink may throw PartitionNotFound Exception if the 
downstream task reached Running state earlier than it's upstream task
 Key: FLINK-27608
 URL: https://issues.apache.org/jira/browse/FLINK-27608
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.2
Reporter: zlzhang0122
 Fix For: 1.16.0


Flink streaming job deployment may throw PartitionNotFound Exception if the 
downstream task reached Running state earlier than its upstream task and after 
maximum backoff for partition requests passed.But the config of 
taskmanager.network.request-backoff.max is not eay to decide. Can we use a loop 
awaiting the upstream task partition be ready?

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-04-28 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529298#comment-17529298
 ] 

zlzhang0122 commented on FLINK-25068:
-

[~gaoyunhaii] [~airblader] Can you help to review this pr? thanks!

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-03-22 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510417#comment-17510417
 ] 

zlzhang0122 commented on FLINK-25068:
-

[~chesnay] can you help to review the pr?

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-03-22 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-25068:

Fix Version/s: 1.16.0

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26788) AbstractDeserializationSchema should add cause when thow a FlinkRuntimeException

2022-03-21 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-26788:

Description: 
In the constructor method of the class AbstractDeserializationSchema which has 
no parameters, it throw a FlinkRuntimeException, but it has no cause, which 
will lead more difficult to find the root cause, we should use the 
{code:java}
FlinkRuntimeException(String message, Throwable cause) {code}
instead of 
{code:java}
FlinkRuntimeException(String message) {code}
see 
[here|https://github.com/apache/flink/blob/ff3336266e5ea6c440916c1ae4ab9440e2ceb0bf/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java#L107].

 

  was:
In the constructor method of the class AbstractDeserializationSchema which has 
no parameters, it throw a FlinkRuntimeException, but it has no cause, which 
will lead more difficult to find the root cause, we should use the 
{code:java}
FlinkRuntimeException(String message, Throwable cause) {code}
instead of 
{code:java}
FlinkRuntimeException(String message) {code}
.

 


> AbstractDeserializationSchema should add cause when thow a 
> FlinkRuntimeException
> 
>
> Key: FLINK-26788
> URL: https://issues.apache.org/jira/browse/FLINK-26788
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.12.7, 1.13.6, 1.14.4
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> In the constructor method of the class AbstractDeserializationSchema which 
> has no parameters, it throw a FlinkRuntimeException, but it has no cause, 
> which will lead more difficult to find the root cause, we should use the 
> {code:java}
> FlinkRuntimeException(String message, Throwable cause) {code}
> instead of 
> {code:java}
> FlinkRuntimeException(String message) {code}
> see 
> [here|https://github.com/apache/flink/blob/ff3336266e5ea6c440916c1ae4ab9440e2ceb0bf/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java#L107].
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26788) AbstractDeserializationSchema should add cause when thow a FlinkRuntimeException

2022-03-21 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-26788:
---

 Summary: AbstractDeserializationSchema should add cause when thow 
a FlinkRuntimeException
 Key: FLINK-26788
 URL: https://issues.apache.org/jira/browse/FLINK-26788
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.14.4, 1.13.6, 1.12.7
Reporter: zlzhang0122
 Fix For: 1.15.0


In the constructor method of the class AbstractDeserializationSchema which has 
no parameters, it throw a FlinkRuntimeException, but it has no cause, which 
will lead more difficult to find the root cause, we should use the 
{code:java}
FlinkRuntimeException(String message, Throwable cause) {code}
instead of 
{code:java}
FlinkRuntimeException(String message) {code}
.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-03-02 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17500462#comment-17500462
 ] 

zlzhang0122 commented on FLINK-25068:
-

[~airblader] can you help to review the pr?

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26129) Using fail method instead of kill method when yarn application deployment error occures

2022-02-15 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17492968#comment-17492968
 ] 

zlzhang0122 commented on FLINK-26129:
-

[~kkl0u] [~aljoscha]  What do u think about this? Any suggestion is very 
appreciate!
h4.

> Using fail method instead of kill method when yarn application deployment 
> error occures
> ---
>
> Key: FLINK-26129
> URL: https://issues.apache.org/jira/browse/FLINK-26129
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.14.3
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> When deploy an application using yarn, it add a DeploymentFailureHook and if 
> the deployment duration exceed the applicationSubmitTimeout(this situation 
> always occures when the resource is not sufficient,etc.).
> Now it uses yarnClient.killApplication() to kill fail this deployment, I 
> think maybe we should using yarnClient.failApplicationAttempt() to fail the 
> deployment. IMO, using fail method instead of kill method can trigger the 
> retry action and is more reasonable.
> The change is very simple and only affect the yarn application deployment.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26129) Using fail method instead of kill method when yarn application deployment error occures

2022-02-14 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-26129:
---

 Summary: Using fail method instead of kill method when yarn 
application deployment error occures
 Key: FLINK-26129
 URL: https://issues.apache.org/jira/browse/FLINK-26129
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.14.3
Reporter: zlzhang0122
 Fix For: 1.15.0


When deploy an application using yarn, it add a DeploymentFailureHook and if 
the deployment duration exceed the applicationSubmitTimeout(this situation 
always occures when the resource is not sufficient,etc.).

Now it uses yarnClient.killApplication() to kill fail this deployment, I think 
maybe we should using yarnClient.failApplicationAttempt() to fail the 
deployment. IMO, using fail method instead of kill method can trigger the retry 
action and is more reasonable.

The change is very simple and only affect the yarn application deployment.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25634) flink-read-onyarn-configuration

2022-01-13 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475295#comment-17475295
 ] 

zlzhang0122 commented on FLINK-25634:
-

A lower version of yarn may have some bugs, such as YARN-6153 Could you try it 
on a higher version?

> flink-read-onyarn-configuration
> ---
>
> Key: FLINK-25634
> URL: https://issues.apache.org/jira/browse/FLINK-25634
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.2
>Reporter: 宇宙先生
>Priority: Major
> Attachments: image-2022-01-13-10-02-57-803.png, 
> image-2022-01-13-10-03-28-230.png, image-2022-01-13-10-07-02-908.png, 
> image-2022-01-13-10-09-36-890.png, image-2022-01-13-10-10-44-945.png, 
> image-2022-01-13-10-14-18-918.png, image-2022-01-13-14-19-43-539.png, 
> image-2022-01-13-16-33-24-459.png, image-2022-01-13-16-33-46-046.png, 
> image-2022-01-13-16-35-37-432.png, image-2022-01-13-19-17-01-097.png, 
> image-2022-01-13-19-17-10-731.png, image-2022-01-13-19-17-15-511.png
>
>
> in flink-src.code :
> !image-2022-01-13-10-14-18-918.png!
> Set the number of retries for failed YARN ApplicationMasters/JobManagers in 
> high
>  availability mode. This value is usually limited by YARN.
> By default, it's 1 in the standalone case and 2 in the high availability case
>  
> in my cluster,the number of retries for failed YARN ApplicationMasters is 2
> yarn's configuration  also like this
> !image-2022-01-13-10-07-02-908.png!
> But it keeps restarting when my task fails,
> !image-2022-01-13-10-10-44-945.png!
> I would like to know the reason why the configuration is not taking effect.
> sincere thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25634) flink-read-onyarn-configuration

2022-01-13 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475265#comment-17475265
 ] 

zlzhang0122 commented on FLINK-25634:
-

[~chenyu.mr] Which version of yarn do you use?

> flink-read-onyarn-configuration
> ---
>
> Key: FLINK-25634
> URL: https://issues.apache.org/jira/browse/FLINK-25634
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.2
>Reporter: 宇宙先生
>Priority: Major
> Attachments: image-2022-01-13-10-02-57-803.png, 
> image-2022-01-13-10-03-28-230.png, image-2022-01-13-10-07-02-908.png, 
> image-2022-01-13-10-09-36-890.png, image-2022-01-13-10-10-44-945.png, 
> image-2022-01-13-10-14-18-918.png, image-2022-01-13-14-19-43-539.png, 
> image-2022-01-13-16-33-24-459.png, image-2022-01-13-16-33-46-046.png, 
> image-2022-01-13-16-35-37-432.png
>
>
> in flink-src.code :
> !image-2022-01-13-10-14-18-918.png!
> Set the number of retries for failed YARN ApplicationMasters/JobManagers in 
> high
>  availability mode. This value is usually limited by YARN.
> By default, it's 1 in the standalone case and 2 in the high availability case
>  
> in my cluster,the number of retries for failed YARN ApplicationMasters is 2
> yarn's configuration  also like this
> !image-2022-01-13-10-07-02-908.png!
> But it keeps restarting when my task fails,
> !image-2022-01-13-10-10-44-945.png!
> I would like to know the reason why the configuration is not taking effect.
> sincere thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475155#comment-17475155
 ] 

zlzhang0122 commented on FLINK-24207:
-

[~roman]  ok, I will check for this, thx!!

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25634) flink-read-onyarn-configuration

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475136#comment-17475136
 ] 

zlzhang0122 commented on FLINK-25634:
-

You can find the config yarn.application-attempt-failures-validity-interval in 
YarnConfigOptions, this is a flink conf.

> flink-read-onyarn-configuration
> ---
>
> Key: FLINK-25634
> URL: https://issues.apache.org/jira/browse/FLINK-25634
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.2
>Reporter: 宇宙先生
>Priority: Major
> Attachments: image-2022-01-13-10-02-57-803.png, 
> image-2022-01-13-10-03-28-230.png, image-2022-01-13-10-07-02-908.png, 
> image-2022-01-13-10-09-36-890.png, image-2022-01-13-10-10-44-945.png, 
> image-2022-01-13-10-14-18-918.png, image-2022-01-13-14-19-43-539.png
>
>
> in flink-src.code :
> !image-2022-01-13-10-14-18-918.png!
> Set the number of retries for failed YARN ApplicationMasters/JobManagers in 
> high
>  availability mode. This value is usually limited by YARN.
> By default, it's 1 in the standalone case and 2 in the high availability case
>  
> in my cluster,the number of retries for failed YARN ApplicationMasters is 2
> yarn's configuration  also like this
> !image-2022-01-13-10-07-02-908.png!
> But it keeps restarting when my task fails,
> !image-2022-01-13-10-10-44-945.png!
> I would like to know the reason why the configuration is not taking effect.
> sincere thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25634) flink-read-onyarn-configuration

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475136#comment-17475136
 ] 

zlzhang0122 edited comment on FLINK-25634 at 1/13/22, 7:04 AM:
---

You can find the config yarn.application-attempt-failures-validity-interval in 
[YarnConfigOptions|https://github.com/apache/flink/blob/dbbf2a36111da1faea5c901e3b008cc788913bf8/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java#L106],
 this is a flink conf.


was (Author: zlzhang0122):
You can find the config yarn.application-attempt-failures-validity-interval in 
YarnConfigOptions, this is a flink conf.

> flink-read-onyarn-configuration
> ---
>
> Key: FLINK-25634
> URL: https://issues.apache.org/jira/browse/FLINK-25634
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.2
>Reporter: 宇宙先生
>Priority: Major
> Attachments: image-2022-01-13-10-02-57-803.png, 
> image-2022-01-13-10-03-28-230.png, image-2022-01-13-10-07-02-908.png, 
> image-2022-01-13-10-09-36-890.png, image-2022-01-13-10-10-44-945.png, 
> image-2022-01-13-10-14-18-918.png, image-2022-01-13-14-19-43-539.png
>
>
> in flink-src.code :
> !image-2022-01-13-10-14-18-918.png!
> Set the number of retries for failed YARN ApplicationMasters/JobManagers in 
> high
>  availability mode. This value is usually limited by YARN.
> By default, it's 1 in the standalone case and 2 in the high availability case
>  
> in my cluster,the number of retries for failed YARN ApplicationMasters is 2
> yarn's configuration  also like this
> !image-2022-01-13-10-07-02-908.png!
> But it keeps restarting when my task fails,
> !image-2022-01-13-10-10-44-945.png!
> I would like to know the reason why the configuration is not taking effect.
> sincere thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25634) flink-read-onyarn-configuration

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475096#comment-17475096
 ] 

zlzhang0122 commented on FLINK-25634:
-

I have encountered similar case and found that the config 
yarn.application-attempts is also affect by the config 
yarn.application-attempt-failures-validity-interval, the default value of the 
second config is 10 seconds, you can go ahead and check the config.Maybe the 
same case.

> flink-read-onyarn-configuration
> ---
>
> Key: FLINK-25634
> URL: https://issues.apache.org/jira/browse/FLINK-25634
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.11.2
>Reporter: 宇宙先生
>Priority: Major
> Attachments: image-2022-01-13-10-02-57-803.png, 
> image-2022-01-13-10-03-28-230.png, image-2022-01-13-10-07-02-908.png, 
> image-2022-01-13-10-09-36-890.png, image-2022-01-13-10-10-44-945.png, 
> image-2022-01-13-10-14-18-918.png
>
>
> in flink-src.code :
> !image-2022-01-13-10-14-18-918.png!
> Set the number of retries for failed YARN ApplicationMasters/JobManagers in 
> high
>  availability mode. This value is usually limited by YARN.
> By default, it's 1 in the standalone case and 2 in the high availability case
>  
> in my cluster,the number of retries for failed YARN ApplicationMasters is 2
> yarn's configuration  also like this
> !image-2022-01-13-10-07-02-908.png!
> But it keeps restarting when my taskfails,
> !image-2022-01-13-10-10-44-945.png!
> I would like to know the reason why the configuration is not taking effect.
> sincere thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474472#comment-17474472
 ] 

zlzhang0122 edited comment on FLINK-24207 at 1/12/22, 11:55 AM:


[~roman]  Sorry for the late reply. In our use case, although the third party 
component can't support transaction or idempotent, we also want to ensure 
end-to-end exactly-once using 2PC, so we using ListState to cache the data. 
When checkpointing, we will snapshot the ListState, and when the 
CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to 
the third party component and delete it from the state. Of caurse, the commit 
or delete may failed, we use retry or fail the whole job to deal with this 
situation, for the commit of same batch data, the third party component can 
ensure the idempotent.


was (Author: zlzhang0122):
[~roman]  Sorry for the late reply. In our use case, although the third party 
component can't support transaction or idempotent, we also want to ensure 
end-to-end exactly-once using 2PC, so we using ListState to cache the data. 
When checkpointing, we will snapshot the ListState, and when the 
CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to 
the third party downstream and delete it from the state. Of caurse, the commit 
or delete may failed, we use retry or fail the whole job to deal with this 
situation, for the commit of same batch data, the third party component can 
ensure the idempotent.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474472#comment-17474472
 ] 

zlzhang0122 commented on FLINK-24207:
-

[~roman]  Sorry for the late reply. In our use case, although the third party 
component can't support transaction or idempotent, we also want to ensure 
end-to-end exactly-once using 2PC, so we using ListState to cache the data. 
When checkpointing, we will snapshot the ListState, and when the 
CheckpointCoordinator notifyCheckpointComplete, we will commit the ListState to 
the third party downstream and delete it from the state. Of caurse, the commit 
or delete may failed, we use retry or fail the whole job to deal with this 
situation, for the commit of same batch data, the third party component can 
ensure the idempotent.

> Add support of KeyedState in TwoPhaseCommitSinkFunction
> ---
>
> Key: FLINK-24207
> URL: https://issues.apache.org/jira/browse/FLINK-24207
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
>
> Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
> state, but operator state will do a deep copy when taking checkpoint, so 
> large operator state may produce a OOM error. Add support of KeyedState in 
> TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and 
> give users more convenience.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474408#comment-17474408
 ] 

zlzhang0122 edited comment on FLINK-25068 at 1/12/22, 10:07 AM:


Since the [FLink-12067|https://issues.apache.org/jira/browse/FLINK-12607] have 
been merged and the implemention can be more easily. [~trohrmann] Could you 
assign me the ticket?


was (Author: zlzhang0122):
Since the FLink-12067 have been merged and the implemention can be more easily. 
[~trohrmann] Could you assign me the ticket?

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474408#comment-17474408
 ] 

zlzhang0122 commented on FLINK-25068:
-

Since the FLink-12067 have been merged and the implemention can be more easily. 
[~trohrmann] Could you assign me the ticket?

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-12607) Introduce a REST API that returns the maxParallelism of a job

2022-01-12 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474400#comment-17474400
 ] 

zlzhang0122 commented on FLINK-12607:
-

_The parameter of the ArchivedExecutionConfigBuilder.setMaxParallelism() maybe 
error, and I have created a 
[pr-18337|https://github.com/apache/flink/pull/18337] to fix it._

> Introduce a REST API that returns the maxParallelism of a job
> -
>
> Key: FLINK-12607
> URL: https://issues.apache.org/jira/browse/FLINK-12607
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.6.3
>Reporter: Akshay Kanfade
>Assignee: John Phelan
>Priority: Minor
>  Labels: pull-request-available, starter
> Fix For: 1.13.0
>
>
> Today, Flink does not offer any way to get the maxParallelism for a job and 
> it's operators through any of the REST APIs. Since, the internal state 
> already tracks maxParallelism for a job and it's operators, we should expose 
> this via the REST APIs so that application developer can get more insights on 
> the current state.
> There can be two approaches on how we can do this -
> Approach 1 :
> Modify the existing rest API response model to additionally expose a new 
> field 'maxParallelism'. Some of the REST APIs that would be affected by this
> |h5. */jobs/:jobid/vertices/:vertexid*|
> |h5. */jobs/:jobid*|
>  
> Approach 2 :
> Create a new REST API that would only return maxParallelism for a job and 
> it's operators.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24694) Translate "Checkpointing under backpressure" page into Chinese

2021-12-23 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17464557#comment-17464557
 ] 

zlzhang0122 commented on FLINK-24694:
-

@[~pnowojski] WDYT?

> Translate "Checkpointing under backpressure" page into Chinese
> --
>
> Key: FLINK-24694
> URL: https://issues.apache.org/jira/browse/FLINK-24694
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.15.0, 1.14.1
>Reporter: Piotr Nowojski
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0
>
>
> Page {{content.zh/docs/ops/state/checkpointing_under_backpressure.md}} needs 
> to be translated.
> Reference english version ticket: FLINK-24670



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2021-11-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450809#comment-17450809
 ] 

zlzhang0122 edited comment on FLINK-25068 at 11/30/21, 2:31 AM:


[~trohrmann]  Nice advice!!!  +1 for expose on a per operator basis. I think we 
can put it in the Job Overview - Job Graph, just under the Parallelism and also 
in the Job Detail tab.


was (Author: zlzhang0122):
[~trohrmann]  Nice advice!!!  +1 for expose maximum parallelism on a per 
operator basis. I think we can put it in the Job Overview - Job Graph, just 
under the Parallelism and also in the Job Detail tab.

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2021-11-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450809#comment-17450809
 ] 

zlzhang0122 commented on FLINK-25068:
-

[~trohrmann]  Nice advice!!!  +1 for expose maximum parallelism on a per 
operator basis. I think we can put it in the Job Overview - Job Graph, just 
under the Parallelism and also in the Job Detail tab.

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2021-11-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450242#comment-17450242
 ] 

zlzhang0122 edited comment on FLINK-25068 at 11/29/21, 8:11 AM:


[~Zhanghao Chen] Same as what i thought! We can add the max parallelism just 
under the job parallelism, name as job max parallelism.etc, and the implemetion 
is easy since we can add an attribute in JobConfigInfo and 
ArchivedExecutionConfig. [~trohrmann] [~chesnay] [~sewen] what do you think? 
Any suggestion is very appreciate! If this implemetion has no problem, I can 
create a pr for this.


was (Author: zlzhang0122):
[~Zhanghao Chen] Same as what i thought! We can add the max parallelism just 
under the job parallelism, name as job max parallelism.etc, and the implemetion 
is easy since we can add an attribute in JobConfigInfo and 
ArchivedExecutionConfig. [~trohrmann] [~chesnay] [~sewen] what do you think? If 
this implemetion has no problem, I can create a pr for this.

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2021-11-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450242#comment-17450242
 ] 

zlzhang0122 commented on FLINK-25068:
-

[~Zhanghao Chen] Same as what i thought! We can add the max parallelism just 
under the job parallelism, name as job max parallelism.etc, and the implemetion 
is easy since we can add an attribute in JobConfigInfo and 
ArchivedExecutionConfig. [~trohrmann] [~chesnay] [~sewen] what do you think? If 
this implemetion has no problem, I can create a pr for this.

> Show the maximum parallelism (number of key groups) of a job in Web UI
> --
>
> Key: FLINK-25068
> URL: https://issues.apache.org/jira/browse/FLINK-25068
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>
> Now, Flink use maximum parallelism as the number of key groups to distribute 
> the key, the maximum parallelism can be set manually, or flink will set-up 
> automatically, sometimes the value is useful and we may want to know it, 
> maybe we can expose in the Web UI.
> By doing this, we can easily know the max parallelism we can suggest the user 
> to scale when they are facing the leak of through-output, and we can know 
> which subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25068) Show the maximum parallelism (number of key groups) of a job in Web UI

2021-11-25 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-25068:
---

 Summary: Show the maximum parallelism (number of key groups) of a 
job in Web UI
 Key: FLINK-25068
 URL: https://issues.apache.org/jira/browse/FLINK-25068
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.14.0
Reporter: zlzhang0122


Now, Flink use maximum parallelism as the number of key groups to distribute 
the key, the maximum parallelism can be set manually, or flink will set-up 
automatically, sometimes the value is useful and we may want to know it, maybe 
we can expose in the Web UI.

By doing this, we can easily know the max parallelism we can suggest the user 
to scale when they are facing the leak of through-output, and we can know which 
subtask will processing the special value and we can find the log soon.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24854) StateHandleSerializationTest unit test error

2021-11-22 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447250#comment-17447250
 ] 

zlzhang0122 commented on FLINK-24854:
-

[~sewen] Hi, what's your opinion? Any suggestion is very appreciate!

> StateHandleSerializationTest unit test error
> 
>
> Key: FLINK-24854
> URL: https://issues.apache.org/jira/browse/FLINK-24854
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> StateHandleSerializationTest.ensureStateHandlesHaveSerialVersionUID() will 
> fail beacuse RocksDBStateDownloaderTest has an anonymous class of 
> StreamStateHandle, and this class is a subtype of StateObject, since the 
> class is an anonymous, the assertFalse will fail as well as this unit test.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24694) Translate "Checkpointing under backpressure" page into Chinese

2021-11-10 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441704#comment-17441704
 ] 

zlzhang0122 commented on FLINK-24694:
-

[~pnowojski]  I have done the work and have created a pr, can you help to 
review plz, thanks!

> Translate "Checkpointing under backpressure" page into Chinese
> --
>
> Key: FLINK-24694
> URL: https://issues.apache.org/jira/browse/FLINK-24694
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.15.0, 1.14.1
>Reporter: Piotr Nowojski
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Page {{content.zh/docs/ops/state/checkpointing_under_backpressure.md}} needs 
> to be translated.
> Reference english version ticket: FLINK-24670



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24854) StateHandleSerializationTest unit test error

2021-11-10 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441575#comment-17441575
 ] 

zlzhang0122 commented on FLINK-24854:
-

I think we can just remove the assertFalse sentence and add serialVersionUID 
fot the anonymous class of StreamStateHandle in RocksDBStateDownloaderTest, 
with this change, everything will be fine.

> StateHandleSerializationTest unit test error
> 
>
> Key: FLINK-24854
> URL: https://issues.apache.org/jira/browse/FLINK-24854
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.14.0
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> StateHandleSerializationTest.ensureStateHandlesHaveSerialVersionUID() will 
> fail beacuse RocksDBStateDownloaderTest has an anonymous class of 
> StreamStateHandle, and this class is a subtype of StateObject, since the 
> class is an anonymous, the assertFalse will fail as well as this unit test.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24854) StateHandleSerializationTest unit test error

2021-11-10 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24854:
---

 Summary: StateHandleSerializationTest unit test error
 Key: FLINK-24854
 URL: https://issues.apache.org/jira/browse/FLINK-24854
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.14.0
Reporter: zlzhang0122
 Fix For: 1.15.0


StateHandleSerializationTest.ensureStateHandlesHaveSerialVersionUID() will fail 
beacuse RocksDBStateDownloaderTest has an anonymous class of StreamStateHandle, 
and this class is a subtype of StateObject, since the class is an anonymous, 
the assertFalse will fail as well as this unit test.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24694) Translate "Checkpointing under backpressure" page into Chinese

2021-10-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435869#comment-17435869
 ] 

zlzhang0122 commented on FLINK-24694:
-

[~pnowojski] I can help to do this, can you assign it to me?

> Translate "Checkpointing under backpressure" page into Chinese
> --
>
> Key: FLINK-24694
> URL: https://issues.apache.org/jira/browse/FLINK-24694
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation
>Affects Versions: 1.15.0, 1.14.1
>Reporter: Piotr Nowojski
>Priority: Major
> Fix For: 1.15.0
>
>
> Page {{content.zh/docs/ops/state/checkpointing_under_backpressure.md}} needs 
> to be translated.
> Reference english version ticket: FLINK-24670



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15358) [configuration] the global configuration will trim the rest of value after a `#` comment sign

2021-10-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435864#comment-17435864
 ] 

zlzhang0122 edited comment on FLINK-15358 at 10/29/21, 9:10 AM:


[~chesnay] what the progress of this ticket? We have encountered similar 
problem, I think maybe we can choose using the third part of the yaml parser 
plugin such as snakeyaml, etc.


was (Author: zlzhang0122):
[~chesnay] what the progress of this ticket? We have encounter the similar 
problem, I think maybe we can choose using the third part of the yaml parser 
plugin such as snakeyaml, etc.

> [configuration] the global configuration will trim the rest of value after a 
> `#` comment sign
> -
>
> Key: FLINK-15358
> URL: https://issues.apache.org/jira/browse/FLINK-15358
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: BlaBlabla
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Hello, 
> I have to config influx metrics reporter in _conf/flink-conf.yaml_ ,however 
> the password contains a # sign, then the flink will skip the rest part of the 
> password after #, eg:
>      *metrics.reporter.influxdb.password: xxpasssxx#blabla*
>   
>  *#blabla* is parsed as an  end line comment.
> Can you guys fix it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15358) [configuration] the global configuration will trim the rest of value after a `#` comment sign

2021-10-29 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435864#comment-17435864
 ] 

zlzhang0122 commented on FLINK-15358:
-

[~chesnay] what the progress of this ticket? We have encounter the similar 
problem, I think maybe we can choose using the third part of the yaml parser 
plugin such as snakeyaml, etc.

> [configuration] the global configuration will trim the rest of value after a 
> `#` comment sign
> -
>
> Key: FLINK-15358
> URL: https://issues.apache.org/jira/browse/FLINK-15358
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration
>Affects Versions: 1.9.1
>Reporter: BlaBlabla
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Hello, 
> I have to config influx metrics reporter in _conf/flink-conf.yaml_ ,however 
> the password contains a # sign, then the flink will skip the rest part of the 
> password after #, eg:
>      *metrics.reporter.influxdb.password: xxpasssxx#blabla*
>   
>  *#blabla* is parsed as an  end line comment.
> Can you guys fix it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24542) Expose the freshness metrics for kafka connector

2021-10-20 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17431068#comment-17431068
 ] 

zlzhang0122 commented on FLINK-24542:
-

[~renqs] Uhh, yes, it is a derived metric instead of a kafka standard metric, 
IMO kafka just want to care about the key metrics about itself, but this metric 
care more about the consumer side like kafka-connector, etc..

> Expose the freshness metrics for kafka connector
> 
>
> Key: FLINK-24542
> URL: https://issues.apache.org/jira/browse/FLINK-24542
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2, 1.14.0, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> When we start a flink job to consume apache kafka, we usually use offsetLag, 
> which can be calulated by current-offsets minus committed-offsets, but 
> sometimes the offsetLag is hard to understand, we can hardly to judge wether 
> the value is normal or not. Kafka have proposed a new metric: freshness(see 
> [a-guide-to-kafka-consumer-freshness|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]).
> So we can also expose the freshness metric for kafka connector to improve the 
> user experience.From this freshness metric, user can easily know wether the 
> kafka message is backlog and need to deal with it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24542) Expose the freshness metrics for kafka connector

2021-10-14 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24542:

Description: 
When we start a flink job to consume apache kafka, we usually use offsetLag, 
which can be calulated by current-offsets minus committed-offsets, but 
sometimes the offsetLag is hard to understand, we can hardly to judge wether 
the value is normal or not. Kafka have proposed a new metric: freshness(see 
[a-guide-to-kafka-consumer-freshness|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]).

So we can also expose the freshness metric for kafka connector to improve the 
user experience.From this freshness metric, user can easily know wether the 
kafka message is backlog and need to deal with it.

  was:
When we start a flink job to consume apache kafka, we usually use offsetLag, 
which can be calulated by current-offsets minus committed-offsets, but 
sometimes the offsetLag is hard to understand, we can hardly to judge wether 
the value is normal or not. Kafka have proposed a new metric: freshness(see 
[a-guide-to-kafka-consumer-freshness|[https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0])|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]),].

So we can also expose the freshness metric for kafka connector to improve the 
user experience.From this freshness metric, user can easily know wether the 
kafka message is backlog and need to deal with it.


> Expose the freshness metrics for kafka connector
> 
>
> Key: FLINK-24542
> URL: https://issues.apache.org/jira/browse/FLINK-24542
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2, 1.14.0, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> When we start a flink job to consume apache kafka, we usually use offsetLag, 
> which can be calulated by current-offsets minus committed-offsets, but 
> sometimes the offsetLag is hard to understand, we can hardly to judge wether 
> the value is normal or not. Kafka have proposed a new metric: freshness(see 
> [a-guide-to-kafka-consumer-freshness|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]).
> So we can also expose the freshness metric for kafka connector to improve the 
> user experience.From this freshness metric, user can easily know wether the 
> kafka message is backlog and need to deal with it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24542) Expose the freshness metrics for kafka connector

2021-10-14 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24542:

Description: 
When we start a flink job to consume apache kafka, we usually use offsetLag, 
which can be calulated by current-offsets minus committed-offsets, but 
sometimes the offsetLag is hard to understand, we can hardly to judge wether 
the value is normal or not. Kafka have proposed a new metric: freshness(see 
[a-guide-to-kafka-consumer-freshness|[https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0])|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]),].

So we can also expose the freshness metric for kafka connector to improve the 
user experience.From this freshness metric, user can easily know wether the 
kafka message is backlog and need to deal with it.

  was:
When we start a flink job to consume apache kafka, we usually use offsetLag, 
which can be calulated by current-offsets minus committed-offsets, but 
sometimes the offsetLag is hard to understand, we can hardly to judge wether 
the value is normal or not. Kafka have proposed a new metric: freshness(see [a 
guide to kafka consumer 
freshness|[https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0])|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]),].

So we can also expose the freshness metric for kafka connector to improve the 
user experience.From this freshness metric, user can easily know wether the 
kafka message is backlog and need to deal with it.


> Expose the freshness metrics for kafka connector
> 
>
> Key: FLINK-24542
> URL: https://issues.apache.org/jira/browse/FLINK-24542
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.12.2, 1.14.0, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> When we start a flink job to consume apache kafka, we usually use offsetLag, 
> which can be calulated by current-offsets minus committed-offsets, but 
> sometimes the offsetLag is hard to understand, we can hardly to judge wether 
> the value is normal or not. Kafka have proposed a new metric: freshness(see 
> [a-guide-to-kafka-consumer-freshness|[https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0])|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]),].
> So we can also expose the freshness metric for kafka connector to improve the 
> user experience.From this freshness metric, user can easily know wether the 
> kafka message is backlog and need to deal with it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24542) Expose the freshness metrics for kafka connector

2021-10-14 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24542:
---

 Summary: Expose the freshness metrics for kafka connector
 Key: FLINK-24542
 URL: https://issues.apache.org/jira/browse/FLINK-24542
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.13.1, 1.14.0, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.15.0


When we start a flink job to consume apache kafka, we usually use offsetLag, 
which can be calulated by current-offsets minus committed-offsets, but 
sometimes the offsetLag is hard to understand, we can hardly to judge wether 
the value is normal or not. Kafka have proposed a new metric: freshness(see [a 
guide to kafka consumer 
freshness|[https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0])|https://www.jesseyates.com/2019/11/04/kafka-consumer-freshness-a-guide.html?trk=article_share_wechat=timeline=0]),].

So we can also expose the freshness metric for kafka connector to improve the 
user experience.From this freshness metric, user can easily know wether the 
kafka message is backlog and need to deal with it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24122) Add support to do clean in history server

2021-10-11 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17427431#comment-17427431
 ] 

zlzhang0122 commented on FLINK-24122:
-

[~trohrmann] IMO the new option is the addition for the retention number, if 
users configure both retention number and retention time, they will take effect 
at the same time, but if users just configure one of them, it should also take 
effect separately.

> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs:{code}
> a positive number, then clean the oldest files in hdfs and local.
> But the retained-jobs number is difficult to determine.
> For example, users may want to check the history jobs yesterday while many 
> jobs failed today and exceed the retained-jobs number, then the history jobs 
> of yesterday will be delete. So what if add a configuration which contain a 
> retained-times that indicate the max time the history job retain?
> Also it can't clean the job history files which was no longer in hdfs but 
> still cached in local filesystem and these files will store forever and can't 
> be cleaned unless users manually do this. Maybe we can give a option and do 
> this clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24122) Add support to do clean in history server

2021-10-09 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17426554#comment-17426554
 ] 

zlzhang0122 commented on FLINK-24122:
-

[~trohrmann] [~pnowojski] [~jark] [~gyfora] what do you think? Any suggestion 
is very appreciate!

> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs:{code}
> a positive number, then clean the oldest files in hdfs and local.
> But the retained-jobs number is difficult to determine.
> For example, users may want to check the history jobs yesterday while many 
> jobs failed today and exceed the retained-jobs number, then the history jobs 
> of yesterday will be delete. So what if add a configuration which contain a 
> retained-times that indicate the max time the history job retain?
> Also it can't clean the job history files which was no longer in hdfs but 
> still cached in local filesystem and these files will store forever and can't 
> be cleaned unless users manually do this. Maybe we can give a option and do 
> this clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24122) Add support to do clean in history server

2021-10-09 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24122:

Description: 
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs:{code}
a positive number, then clean the oldest files in hdfs and local.

But the retained-jobs number is difficult to determine.

For example, users may want to check the history jobs yesterday while many jobs 
failed today and exceed the retained-jobs number, then the history jobs of 
yesterday will be delete. So what if add a configuration which contain a 
retained-times that indicate the max time the history job retain?

Also it can't clean the job history files which was no longer in hdfs but still 
cached in local filesystem and these files will store forever and can't be 
cleaned unless users manually do this. Maybe we can give a option and do this 
clean if the option says true.

  was:
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs:{code}
a positive number, then clean the oldest files in hdfs and local.

But the retained-jobs number is difficult to determine.

For example, users may want to check the history jobs yesterday while many jobs 
failed today and exceed the retained-jobs number, then the history jobs of 
yesterday will be delete. So what if add a configuration which contain a 
retained-times that indicate the max time the history job retain?

Also it can't clean the job history which was no longer in hdfs but still 
cached in local filesystem and these files will store forever and can't be 
cleaned unless users manually do this. Maybe we can give a option and do this 
clean if the option says true.


> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs:{code}
> a positive number, then clean the oldest files in hdfs and local.
> But the retained-jobs number is difficult to determine.
> For example, users may want to check the history jobs yesterday while many 
> jobs failed today and exceed the retained-jobs number, then the history jobs 
> of yesterday will be delete. So what if add a configuration which contain a 
> retained-times that indicate the max time the history job retain?
> Also it can't clean the job history files which was no longer in hdfs but 
> still cached in local filesystem and these files will store forever and can't 
> be cleaned unless users manually do this. Maybe we can give a option and do 
> this clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24122) Add support to do clean in history server

2021-10-09 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24122:

Description: 
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs:{code}
a positive number, then clean the oldest files in hdfs and local.

But the retained-jobs number is difficult to determine.

For example, users may want to check the history jobs yesterday while many jobs 
failed today and exceed the retained-jobs number, then the history jobs of 
yesterday will be delete. So what if add a configuration which contain a 
retained-times that indicate the max time the history job retain?

Also it can't clean the job history which was no longer in hdfs but still 
cached in local filesystem and these files will store forever and can't be 
cleaned unless users manually do this. Maybe we can give a option and do this 
clean if the option says true.

  was:
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs:{code}
a positive number, then clean the oldest files in hdfs and local.

But the retained-jobs number is difficult to determine.

For example, users may want to check the history jobs yesterday while many jobs 
failed today and exceed the retained-jobs number, then the history jobs of 
yesterday will be delete. So what if add a configuration which contain a 
retained-times that indicate the max time the history job retain?


> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs:{code}
> a positive number, then clean the oldest files in hdfs and local.
> But the retained-jobs number is difficult to determine.
> For example, users may want to check the history jobs yesterday while many 
> jobs failed today and exceed the retained-jobs number, then the history jobs 
> of yesterday will be delete. So what if add a configuration which contain a 
> retained-times that indicate the max time the history job retain?
> Also it can't clean the job history which was no longer in hdfs but still 
> cached in local filesystem and these files will store forever and can't be 
> cleaned unless users manually do this. Maybe we can give a option and do this 
> clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24122) Add support to do clean in history server

2021-10-08 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24122:

Description: 
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs:{code}
a positive number, then clean the oldest files in hdfs and local.

But the retained-jobs number is difficult to determine.

For example, users may want to check the history jobs yesterday while many jobs 
failed today and exceed the retained-jobs number, then the history jobs of 
yesterday will be delete. So what if add a configuration which contain a 
retained-times that indicate the max time the history job retain?

  was:
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs{code}
a positive number, then clean the oldest files in hdfs and local.

But it can't clean the job history which was no longer in hdfs but still cached 
in local filesystem and these files will store forever and can't be cleaned 
unless users manually do this. Maybe we can give a option and do this clean if 
the option says true.


> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs:{code}
> a positive number, then clean the oldest files in hdfs and local.
> But the retained-jobs number is difficult to determine.
> For example, users may want to check the history jobs yesterday while many 
> jobs failed today and exceed the retained-jobs number, then the history jobs 
> of yesterday will be delete. So what if add a configuration which contain a 
> retained-times that indicate the max time the history job retain?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24122) Add support to do clean in history server

2021-09-28 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24122:

Description: 
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.archive.clean-expired-jobs: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.

 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs{code}
a positive number, then clean the oldest files in hdfs and local.

But it can't clean the job history which was no longer in hdfs but still cached 
in local filesystem and these files will store forever and can't be cleaned 
unless users manually do this. Maybe we can give a option and do this clean if 
the option says true.

  was:
Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.web.ssl.enabled: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.
 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs{code}
a positive number, then clean the oldest files in hdfs and local.

But it can't clean the job history which was no longer in hdfs but still cached 
in local filesystem and these files will store forever and can't be cleaned 
unless users manually do this. Maybe we can give a option and do this clean if 
the option says true.


> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.archive.clean-expired-jobs: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs{code}
> a positive number, then clean the oldest files in hdfs and local.
> But it can't clean the job history which was no longer in hdfs but still 
> cached in local filesystem and these files will store forever and can't be 
> cleaned unless users manually do this. Maybe we can give a option and do this 
> clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24377) TM resource may not be properly released after heartbeat timeout

2021-09-26 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17420282#comment-17420282
 ] 

zlzhang0122 commented on FLINK-24377:
-

We also have encountered this situation too.

> TM resource may not be properly released after heartbeat timeout
> 
>
> Key: FLINK-24377
> URL: https://issues.apache.org/jira/browse/FLINK-24377
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Deployment / YARN, Runtime / 
> Coordination
>Affects Versions: 1.14.0, 1.13.2
>Reporter: Xintong Song
>Assignee: Xintong Song
>Priority: Major
> Fix For: 1.14.0, 1.13.3, 1.15.0
>
>
> In native k8s and yarn deploy modes, RM disconnects a TM when its heartbeat 
> times out. However, it does not actively release the pod / container of that 
> TM. The releasing of pod / container relies on the TM to terminate itself 
> after failing to re-register to the RM.
> In some rare conditions, the TM process may not terminate and hang out for 
> long time. In such cases, k8s / yarn sees the process running, thus will not 
> release the pod / container. Neither will Flink's resource manager. 
> Consequently, the resource is leaked until the entire application is 
> terminated.
> To fix this, we should make {{ActiveResourceManager}} to actively release the 
> resource to K8s / Yarn after a TM heartbeat timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23189) Count and fail the task when the disk is error on JobManager

2021-09-22 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418602#comment-17418602
 ] 

zlzhang0122 edited comment on FLINK-23189 at 9/22/21, 1:52 PM:
---

[~pnowojski] ok, I've seen the fix and found that it added the handled of the 
onTriggerFailure when the checkpoint is null, I've noticed this situation but I 
didn't reproduced it in our production environment, so I didn't change the code 
here, but actually we may indeed need this fix for this case.


was (Author: zlzhang0122):
[~pnowojski] ok, I've seen the fix and found that it added the handled of the 
onTriggerFailure when the checkpoint is null, I've found this situation but I 
didn't reproduced it in our production environment, so I didn't change the code 
here, but actually we may indeed need that fix for some corner cases.

> Count and fail the task when the disk is error on JobManager
> 
>
> Key: FLINK-23189
> URL: https://issues.apache.org/jira/browse/FLINK-23189
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: exception.txt
>
>
> When the jobmanager disk is error and the triggerCheckpoint will throw a 
> IOException and fail, this will cause a TRIGGER_CHECKPOINT_FAILURE, but this 
> failure won't cause Job failed. Users can hardly find this error if he don't 
> see the JobManager logs. To avoid this case, I propose that we can figure out 
> these IOException case and increase the failureCounter which can fail the job 
> finally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23189) Count and fail the task when the disk is error on JobManager

2021-09-22 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418602#comment-17418602
 ] 

zlzhang0122 commented on FLINK-23189:
-

[~pnowojski] ok, I've seen the fix and found that it added the handled of the 
onTriggerFailure when the checkpoint is null, I've found this situation but I 
didn't reproduced it in our production environment, so I didn't change the code 
here, but actually we may indeed need that fix for some corner cases.

> Count and fail the task when the disk is error on JobManager
> 
>
> Key: FLINK-23189
> URL: https://issues.apache.org/jira/browse/FLINK-23189
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: exception.txt
>
>
> When the jobmanager disk is error and the triggerCheckpoint will throw a 
> IOException and fail, this will cause a TRIGGER_CHECKPOINT_FAILURE, but this 
> failure won't cause Job failed. Users can hardly find this error if he don't 
> see the JobManager logs. To avoid this case, I propose that we can figure out 
> these IOException case and increase the failureCounter which can fail the job 
> finally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23189) Count and fail the task when the disk is error on JobManager

2021-09-22 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17418420#comment-17418420
 ] 

zlzhang0122 commented on FLINK-23189:
-

[~pnowojski] sure, I will check about it.

> Count and fail the task when the disk is error on JobManager
> 
>
> Key: FLINK-23189
> URL: https://issues.apache.org/jira/browse/FLINK-23189
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Assignee: zlzhang0122
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: exception.txt
>
>
> When the jobmanager disk is error and the triggerCheckpoint will throw a 
> IOException and fail, this will cause a TRIGGER_CHECKPOINT_FAILURE, but this 
> failure won't cause Job failed. Users can hardly find this error if he don't 
> see the JobManager logs. To avoid this case, I propose that we can figure out 
> these IOException case and increase the failureCounter which can fail the job 
> finally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-17 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416576#comment-17416576
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/17/21, 10:09 AM:


[~jark] I have linked the Calcite issue but they thought that it have been 
solved while I tried to set this config in 
FlinkSqlConformance#isGroupByOrdinal(), it didn't worked. I'm trying to have a 
deep dig on it.


was (Author: zlzhang0122):
[~jark] I have linked the Calcite issue but they thought that it have been 
solved while I tried to set this config in 
FlinkSqlConformance#isGroupByAlias(), it didn't worked. I'm trying to have a 
deep dig on it.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-17 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416576#comment-17416576
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/17/21, 9:35 AM:
---

[~jark] I have linked the Calcite issue but they thought that it have been 
solved while I tried to set this config in 
FlinkSqlConformance#isGroupByAlias(), it didn't worked. I'm trying to have a 
deep dig on it.


was (Author: zlzhang0122):
[~jark] I have linked the Calcite issue but they thought that it have been 
solved while I tried to set this config in 
FlinkSqlConformance#isGroupByAlias(), but it didn't worked. I'm trying to have 
a deep dig on it.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24306) group by index throw SqlValidatorException

2021-09-17 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416576#comment-17416576
 ] 

zlzhang0122 commented on FLINK-24306:
-

[~jark] I have linked the Calcite issue but they thought that it have been 
solved but I tried to set this config in FlinkSqlConformance#isGroupByAlias(), 
but it didn't worked. I'm trying to have a deep dig on it.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-17 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416576#comment-17416576
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/17/21, 9:34 AM:
---

[~jark] I have linked the Calcite issue but they thought that it have been 
solved while I tried to set this config in 
FlinkSqlConformance#isGroupByAlias(), but it didn't worked. I'm trying to have 
a deep dig on it.


was (Author: zlzhang0122):
[~jark] I have linked the Calcite issue but they thought that it have been 
solved but I tried to set this config in FlinkSqlConformance#isGroupByAlias(), 
but it didn't worked. I'm trying to have a deep dig on it.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24314) Always use memory state backend with RocksDB

2021-09-17 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416572#comment-17416572
 ] 

zlzhang0122 commented on FLINK-24314:
-

[~shiwuliang] You can set like this:
{code:java}
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
{code}
Using embedded rocksdb to store state, and using checkpointDir to store 
checkpoint.

> Always use memory state backend with RocksDB
> 
>
> Key: FLINK-24314
> URL: https://issues.apache.org/jira/browse/FLINK-24314
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.0
>Reporter: shiwuliang
>Priority: Major
> Attachments: image-2021-09-17-10-59-50-094.png, 
> image-2021-09-17-12-03-58-285.png, image-2021-09-17-12-04-55-009.png, 
> image-2021-09-17-12-16-46-277.png
>
>
> When I config to use `RocksDBStatebackend`, 
>  
> {code:java}
> //代码占位符
> RocksDBStateBackend rocksDBStateBackend = new 
> RocksDBStateBackend("hdfs://");
> streamEnv.setStateBackend(rocksDBStateBackend);{code}
>  
> there are some exception like this:
> !image-2021-09-17-10-59-50-094.png|width=1446,height=420!
>  
> Seems like the RocksdbStatebackend will use FsStateBackend to store 
> checkpoints. So it means that I have used FileSystemStateBackend, why does 
> this exception occur?
>  
> I use flink 1.13.0 and found a similar question like this at: 
> [https://stackoverflow.com/questions/68314652/flink-state-backend-config-with-the-state-processor-api]
>  
> I'm not sure if his question is same with mine.
> I want to know how can I solve this and if it is indeed a 1.13.0 bug, how can 
> I bypass it besides upgrading?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24314) Always use memory state backend with RocksDB

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416426#comment-17416426
 ] 

zlzhang0122 edited comment on FLINK-24314 at 9/17/21, 3:54 AM:
---

In Flink 1.13, the storage of the state and checkpoint was separated,  so you 
should set statebackend and checkpoint location at the same time. In your case, 
you just set the statebackend, so it use memory to store the checkpoint as 
default and since the size is beyound the limit, it throw an exception.


was (Author: zlzhang0122):
In Flink 1.13, the storage of the state and checkpoint was separated,  so you 
should set statebackend and checkpoint location at the same time. In your case, 
you just set the statebackend, so it use memory to store the checkpoint as 
default and since the size is beyound the limit, it report an exception.

> Always use memory state backend with RocksDB
> 
>
> Key: FLINK-24314
> URL: https://issues.apache.org/jira/browse/FLINK-24314
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.0
>Reporter: shiwuliang
>Priority: Major
> Attachments: image-2021-09-17-10-59-50-094.png
>
>
> When I config to use `RocksDBStatebackend`, 
>  
> {code:java}
> //代码占位符
> RocksDBStateBackend rocksDBStateBackend = new 
> RocksDBStateBackend("hdfs://");
> streamEnv.setStateBackend(rocksDBStateBackend);{code}
>  
> there are some exception like this:
> !image-2021-09-17-10-59-50-094.png|width=1446,height=420!
>  
> Seems like the RocksdbStatebackend will use FsStateBackend to store 
> checkpoints. So it means that I have used FileSystemStateBackend, why does 
> this exception occur?
>  
> I use flink 1.13.0 and found a similar question like this at: 
> [https://stackoverflow.com/questions/68314652/flink-state-backend-config-with-the-state-processor-api]
>  
> I'm not sure if his question is same with mine.
> I want to know how can I solve this and if it is indeed a 1.13.0 bug, how can 
> I bypass it besides upgrading?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24314) Always use memory state backend with RocksDB

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416426#comment-17416426
 ] 

zlzhang0122 commented on FLINK-24314:
-

In Flink 1.13, the storage of the state and checkpoint was separated,  so you 
should set statebackend and checkpoint location at the same time. In your case, 
you just set the statebackend, so it use memory to store the checkpoint as 
default and since the size is beyound the limit, it report an exception.

> Always use memory state backend with RocksDB
> 
>
> Key: FLINK-24314
> URL: https://issues.apache.org/jira/browse/FLINK-24314
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.0
>Reporter: shiwuliang
>Priority: Major
> Attachments: image-2021-09-17-10-59-50-094.png
>
>
> When I config to use `RocksDBStatebackend`, 
>  
> {code:java}
> //代码占位符
> RocksDBStateBackend rocksDBStateBackend = new 
> RocksDBStateBackend("hdfs://");
> streamEnv.setStateBackend(rocksDBStateBackend);{code}
>  
> there are some exception like this:
> !image-2021-09-17-10-59-50-094.png|width=1446,height=420!
>  
> Seems like the RocksdbStatebackend will use FsStateBackend to store 
> checkpoints. So it means that I have used FileSystemStateBackend, why does 
> this exception occur?
>  
> I use flink 1.13.0 and found a similar question like this at: 
> [https://stackoverflow.com/questions/68314652/flink-state-backend-config-with-the-state-processor-api]
>  
> I'm not sure if his question is same with mine.
> I want to know how can I solve this and if it is indeed a 1.13.0 bug, how can 
> I bypass it besides upgrading?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416000#comment-17416000
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/16/21, 9:44 AM:
---

I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem is be produced by apache calcite and 
I have created a ticket for calcite.


was (Author: zlzhang0122):
I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem was produced by apache calcite and I 
have created a ticket for calcite.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416000#comment-17416000
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/16/21, 9:16 AM:
---

I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem was produced by apache calcite and I 
have created a ticket for calcite.


was (Author: zlzhang0122):
I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem was produced by apache calcite and I 
have create a ticket for calcite.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416000#comment-17416000
 ] 

zlzhang0122 edited comment on FLINK-24306 at 9/16/21, 9:16 AM:
---

I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem was produced by apache calcite and I 
have create a ticket for calcite.


was (Author: zlzhang0122):
I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem is produce by apache calcite and I 
have create a ticket for calcite.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17416000#comment-17416000
 ] 

zlzhang0122 commented on FLINK-24306:
-

I have examined the latest apache calcite and found that it also couldn't 
support this syntax, I think this problem is produce by apache calcite and I 
have create a ticket for calcite.

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24306:

Attachment: calcite.png

> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: calcite.png, sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24306) group by index throw SqlValidatorException

2021-09-16 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24306:

Description: 
We create a table using following DDL:
{code:java}
create table if not exists datagen_source ( 
id int,
name string,
sex string,
age int,
birthday string,
proc_time as proctime()
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.kind' = 'random',
'fields.id.min' = '1',
'fields.id.max' = '200');{code}
When we running 
{code:java}
select id, count(*) from datagen_source group by id;{code}
Everything will be fine.But if we running 
{code:java}
select id, count(*) from datagen_source group by 1;{code}
We will get a SqlValidatorException like this:
 !sql_exception.png!
Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink also 
should support this syntax too.
  

  was:
We create a table using following DDL:
{code:java}
create table if not exists datagen_source ( 
id int,
name string,
sex string,
age int,
birthday string,
proc_time as proctime()
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.kind' = 'random',
'fields.id.min' = '1',
'fields.id.max' = '200');{code}
When we running 
{code:java}
select id, count(*) from datagen_source group by id;{code}
Everything will be fine.But if we running 
{code:java}
select id, count(*) from datagen_source group by 1;{code}
We will get a SqlValidatorException like this:
!sql_exception.png!
>From the error message we know that Flink SQL also can reconize that the '1' 
>index reprent the 'id'.Since MySql\Hive\Spark SQL\etc. all support group by 
>index, I think Flink also should support this syntax too.
 


> group by index throw SqlValidatorException
> --
>
> Key: FLINK-24306
> URL: https://issues.apache.org/jira/browse/FLINK-24306
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
> Attachments: sql_exception.png
>
>
> We create a table using following DDL:
> {code:java}
> create table if not exists datagen_source ( 
> id int,
> name string,
> sex string,
> age int,
> birthday string,
> proc_time as proctime()
> ) with (
> 'connector' = 'datagen',
> 'rows-per-second' = '1',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '200');{code}
> When we running 
> {code:java}
> select id, count(*) from datagen_source group by id;{code}
> Everything will be fine.But if we running 
> {code:java}
> select id, count(*) from datagen_source group by 1;{code}
> We will get a SqlValidatorException like this:
>  !sql_exception.png!
> Since MySql\Hive\Spark SQL\etc. all support group by index, I think Flink 
> also should support this syntax too.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24306) group by index throw SqlValidatorException

2021-09-15 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24306:
---

 Summary: group by index throw SqlValidatorException
 Key: FLINK-24306
 URL: https://issues.apache.org/jira/browse/FLINK-24306
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.15.0
 Attachments: sql_exception.png

We create a table using following DDL:
{code:java}
create table if not exists datagen_source ( 
id int,
name string,
sex string,
age int,
birthday string,
proc_time as proctime()
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.kind' = 'random',
'fields.id.min' = '1',
'fields.id.max' = '200');{code}
When we running 
{code:java}
select id, count(*) from datagen_source group by id;{code}
Everything will be fine.But if we running 
{code:java}
select id, count(*) from datagen_source group by 1;{code}
We will get a SqlValidatorException like this:
!sql_exception.png!
>From the error message we know that Flink SQL also can reconize that the '1' 
>index reprent the 'id'.Since MySql\Hive\Spark SQL\etc. all support group by 
>index, I think Flink also should support this syntax too.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24207) Add support of KeyedState in TwoPhaseCommitSinkFunction

2021-09-08 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24207:
---

 Summary: Add support of KeyedState in TwoPhaseCommitSinkFunction
 Key: FLINK-24207
 URL: https://issues.apache.org/jira/browse/FLINK-24207
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.14.1


Now, the implementation of TwoPhaseCommitSinkFunction is based on operator 
state, but operator state will do a deep copy when taking checkpoint, so large 
operator state may produce a OOM error. Add support of KeyedState in 
TwoPhaseCommitSinkFunction maybe a good choice to avoid the OOM error and give 
users more convenience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24201) HistoryServer didn't contain TaskManager Logs

2021-09-07 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24201:
---

 Summary: HistoryServer didn't contain TaskManager Logs
 Key: FLINK-24201
 URL: https://issues.apache.org/jira/browse/FLINK-24201
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.14.1


Now, HistoryServer didn't contain TaskManager Logs and users cannot see the 
logs on TaskManager.We can also archive the log info on the TaskManager and 
logged it in history server log and show in HistoryServer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-24122) Add support to do clean in history server

2021-09-03 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-24122:

Issue Type: Bug  (was: Improvement)

> Add support to do clean in history server
> -
>
> Key: FLINK-24122
> URL: https://issues.apache.org/jira/browse/FLINK-24122
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.12.3, 1.13.2
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.1
>
>
> Now, the history server can clean history jobs by two means:
>  # if users have configured 
> {code:java}
> historyserver.web.ssl.enabled: true{code}
> , then compare the files in hdfs over two clean interval and find the delete 
> and clean the local cache file.
>  # if users have configured the 
> {code:java}
> historyserver.archive.retained-jobs{code}
> a positive number, then clean the oldest files in hdfs and local.
> But it can't clean the job history which was no longer in hdfs but still 
> cached in local filesystem and these files will store forever and can't be 
> cleaned unless users manually do this. Maybe we can give a option and do this 
> clean if the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24122) Add support to do clean in history server

2021-09-01 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-24122:
---

 Summary: Add support to do clean in history server
 Key: FLINK-24122
 URL: https://issues.apache.org/jira/browse/FLINK-24122
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.13.2, 1.12.3
Reporter: zlzhang0122
 Fix For: 1.14.1


Now, the history server can clean history jobs by two means:
 # if users have configured 
{code:java}
historyserver.web.ssl.enabled: true{code}
, then compare the files in hdfs over two clean interval and find the delete 
and clean the local cache file.
 # if users have configured the 
{code:java}
historyserver.archive.retained-jobs{code}
a positive number, then clean the oldest files in hdfs and local.

But it can't clean the job history which was no longer in hdfs but still cached 
in local filesystem and these files will store forever and can't be cleaned 
unless users manually do this. Maybe we can give a option and do this clean if 
the option says true.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23849) Support react to the node decommissioning change state on yarn and do graceful restart

2021-08-24 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17403761#comment-17403761
 ] 

zlzhang0122 commented on FLINK-23849:
-

[~trohrmann] ok, I see, maybe community have much more concern about some other 
things. But IMO the auto recover strategy can't guaranty the end-to-end exactly 
once if the downstream doesn't support transactional or idempotent. And support 
reaction to node updates such as decommission can make yarn come to a 
functional consistency just like k8s taint, also it's useful for graceful 
restart of streaming job.

> Support react to the node decommissioning change state on yarn and do 
> graceful restart
> --
>
> Key: FLINK-23849
> URL: https://issues.apache.org/jira/browse/FLINK-23849
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / YARN
>Affects Versions: 1.12.2, 1.13.1, 1.13.2
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> Now we are not interested in node updates in 
> YarnContainerEventHandler.onNodesUpdated , but sometimes we want to evict the 
> running flink process on one node and graceful restart on the other node 
> because of some unexpected reason such as the physical machine need to be 
> recycle or the cloud computing cluster need to be migration. Thus, we can 
> react to the node decommissioning change state, and call the 
> stopWithSavepoint function and then restart it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23849) Support react to the node decommissioning change state on yarn and do graceful restart

2021-08-18 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-23849:

Description: Now we are not interested in node updates in 
YarnContainerEventHandler.onNodesUpdated , but sometimes we want to evict the 
running flink process on one node and graceful restart on the other node 
because of some unexpected reason such as the physical machine need to be 
recycle or the cloud computing cluster need to be migration. Thus, we can react 
to the node decommissioning change state, and call the stopWithSavepoint 
function and then restart it.  (was: Now we are not interested in node updates 
in YarnContainerEventHandler.onNodesUpdated, but sometimes we want to evict the 
running flink process on the node and graceful restart on the other node 
because of some unexpected reason such as the physical machine need to be 
recycle or the cloud computing cluster need to be migration. Thus, we can react 
to the node decommissioning change state, and call the stopWithSavepoint and 
then restart it.)

> Support react to the node decommissioning change state on yarn and do 
> graceful restart
> --
>
> Key: FLINK-23849
> URL: https://issues.apache.org/jira/browse/FLINK-23849
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.12.2, 1.13.1, 1.13.2
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.15.0
>
>
> Now we are not interested in node updates in 
> YarnContainerEventHandler.onNodesUpdated , but sometimes we want to evict the 
> running flink process on one node and graceful restart on the other node 
> because of some unexpected reason such as the physical machine need to be 
> recycle or the cloud computing cluster need to be migration. Thus, we can 
> react to the node decommissioning change state, and call the 
> stopWithSavepoint function and then restart it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23849) Support react to the node decommissioning change state on yarn and do graceful restart

2021-08-17 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-23849:
---

 Summary: Support react to the node decommissioning change state on 
yarn and do graceful restart
 Key: FLINK-23849
 URL: https://issues.apache.org/jira/browse/FLINK-23849
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.13.2, 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.15.0


Now we are not interested in node updates in 
YarnContainerEventHandler.onNodesUpdated, but sometimes we want to evict the 
running flink process on the node and graceful restart on the other node 
because of some unexpected reason such as the physical machine need to be 
recycle or the cloud computing cluster need to be migration. Thus, we can react 
to the node decommissioning change state, and call the stopWithSavepoint and 
then restart it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-23622) Default filesystem scheme improvement

2021-08-13 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 closed FLINK-23622.
---
Resolution: Not A Problem

> Default filesystem scheme improvement
> -
>
> Key: FLINK-23622
> URL: https://issues.apache.org/jira/browse/FLINK-23622
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.2, 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.0
>
>
> Now, when flink try to get the filesystem scheme(when doing checkpoint, 
> savepoint and so on), it will according to the steps as following:
> 1.try to get it from the given path, if it is null, then 2
> 2.try to read the flink configuration of "fs.default-scheme", if it is null, 
> then 3
> 3.default to LocalFileSystem
> Since flink is mostly running on hadoop, and I think that maybe we should 
> also try to get the scheme from hadoop default configuration file 
> core-site.xml and load thevalue of the  key "fs.defaultFS". This step can be 
> added between step 2 and 3.
> So after this change, the steps should like this:
> 1.try to get it from the given path, if it is null, then 2
> 2.try to read the flink configuration of "fs.default-scheme", if it is null, 
> then 3
> 3.try to read the hadoop configuration of "fs.defaultFS", if it is null, then 
> 4
> 4.default to LocalFileSystem.
> I think that it also doesn't break any kind of consistent of flink and can 
> simplify the user behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23561) Detail the container completed message

2021-08-11 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17397317#comment-17397317
 ] 

zlzhang0122 commented on FLINK-23561:
-

[~trohrmann] sure, I'd like to give this a try.

> Detail the container completed message
> --
>
> Key: FLINK-23561
> URL: https://issues.apache.org/jira/browse/FLINK-23561
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / YARN
>Affects Versions: 1.13.1
>Reporter: zlzhang0122
>Priority: Minor
> Fix For: 1.14.0
>
>
> Use the ContainerStatus to detailed the container completed reason, and thus 
> users can explicitly know why the container completed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23564) Make taskmanager.out and taskmanager.err rollable

2021-08-06 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393778#comment-17393778
 ] 

zlzhang0122 edited comment on FLINK-23564 at 8/6/21, 11:05 AM:
---

[~wangyang0918] Maybe using log4j2-iostreams is an alternative choice to 
implemention, but it doesn't flush the buffer and would cause OutOfMemory error 
when using print, println didn't have this problem. I can't solve it and my 
implemention is mostly like yours. So far I didn't find any problem what other 
devs are concerned about.


was (Author: zlzhang0122):
[~wangyang0918] Maybe using log4j2-iostreams is a better choice to 
implemention, but it doesn't flush the buffer and would cause OutOfMemory error 
when using print, println didn't have this problem. I can't solve it and my 
implemention is mostly like yours. So far I didn't find any problem what other 
devs are concerned about.

> Make taskmanager.out and taskmanager.err rollable
> -
>
> Key: FLINK-23564
> URL: https://issues.apache.org/jira/browse/FLINK-23564
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.0
>
>
> Now users can use 
> System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTrace
>  to taskmanager.out and taskmanager.err as much as they want and this may use 
> large space of disk cause the disk problem and influence the checkpoint of 
> the flink and even the stability of the flink or other application on the 
> same node. I proposed that we can make the taskmanager.out and 
> taskmanager.err rollable just like taskmanager.log.By doing this, the disk 
> consume of the taskmanager.out and taskmanager.err can be control.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23564) Make taskmanager.out and taskmanager.err rollable

2021-08-05 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393778#comment-17393778
 ] 

zlzhang0122 commented on FLINK-23564:
-

[~wangyang0918] Maybe using log4j2-iostreams is a better choice to 
implemention, but it doesn't flush the buffer and would cause OutOfMemory error 
when using print, println didn't have this problem. I can't solve it and my 
implemention is mostly like yours. So far I didn't find any problem what other 
devs are concerned about.

> Make taskmanager.out and taskmanager.err rollable
> -
>
> Key: FLINK-23564
> URL: https://issues.apache.org/jira/browse/FLINK-23564
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.0
>
>
> Now users can use 
> System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTrace
>  to taskmanager.out and taskmanager.err as much as they want and this may use 
> large space of disk cause the disk problem and influence the checkpoint of 
> the flink and even the stability of the flink or other application on the 
> same node. I proposed that we can make the taskmanager.out and 
> taskmanager.err rollable just like taskmanager.log.By doing this, the disk 
> consume of the taskmanager.out and taskmanager.err can be control.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23622) Default filesystem scheme improvement

2021-08-04 Thread zlzhang0122 (Jira)
zlzhang0122 created FLINK-23622:
---

 Summary: Default filesystem scheme improvement
 Key: FLINK-23622
 URL: https://issues.apache.org/jira/browse/FLINK-23622
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.13.1, 1.12.2
Reporter: zlzhang0122
 Fix For: 1.14.0


Now, when flink try to get the filesystem scheme(when doing checkpoint, 
savepoint and so on), it will according to the steps as following:

1.try to get it from the given path, if it is null, then 2

2.try to read the flink configuration of "fs.default-scheme", if it is null, 
then 3

3.default to LocalFileSystem

Since flink is mostly running on hadoop, and I think that maybe we should also 
try to get the scheme from hadoop default configuration file core-site.xml and 
load thevalue of the  key "fs.defaultFS". This step can be added between step 2 
and 3.

So after this change, the steps should like this:

1.try to get it from the given path, if it is null, then 2

2.try to read the flink configuration of "fs.default-scheme", if it is null, 
then 3

3.try to read the hadoop configuration of "fs.defaultFS", if it is null, then 4

4.default to LocalFileSystem.

I think that it also doesn't break any kind of consistent of flink and can 
simplify the user behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23564) Make taskmanager.out and taskmanager.err rollable

2021-08-02 Thread zlzhang0122 (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17391883#comment-17391883
 ] 

zlzhang0122 commented on FLINK-23564:
-

[~Jiangang] It's easy to implemention if we redirect the .out and .err to the 
log and make the log rollable, we have implemention this in our production 
environment, and it's running well.

> Make taskmanager.out and taskmanager.err rollable
> -
>
> Key: FLINK-23564
> URL: https://issues.apache.org/jira/browse/FLINK-23564
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.0
>
>
> Now users can use 
> System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTrace
>  to taskmanager.out and taskmanager.err as much as they want and this may use 
> large space of disk cause the disk problem and influence the checkpoint of 
> the flink and even the stability of the flink or other application on the 
> same node. I proposed that we can make the taskmanager.out and 
> taskmanager.err rollable just like taskmanager.log.By doing this, the disk 
> consume of the taskmanager.out and taskmanager.err can be control.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23564) Make taskmanager.out and taskmanager.err rollable

2021-08-02 Thread zlzhang0122 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-23564:

Description: Now users can use 
System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTrace
 to taskmanager.out and taskmanager.err as much as they want and this may use 
large space of disk cause the disk problem and influence the checkpoint of the 
flink and even the stability of the flink or other application on the same 
node. I proposed that we can make the taskmanager.out and taskmanager.err 
rollable just like taskmanager.log.By doing this, the disk consume of the 
taskmanager.out and taskmanager.err can be control.  (was: Now users can use 
System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTraceto
 taskmanager.out and taskmanager.err as much as they want and this may use 
large space of disk cause the disk problem and influence the checkpoint of the 
flink and even the stability of the flink or other application on the same 
node. I proposed that we can make the taskmanager.out and taskmanager.err 
rollable just like taskmanager.log.By doing this, the disk consume of the 
taskmanager.out and taskmanager.err can be control.)

> Make taskmanager.out and taskmanager.err rollable
> -
>
> Key: FLINK-23564
> URL: https://issues.apache.org/jira/browse/FLINK-23564
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.13.1
>Reporter: zlzhang0122
>Priority: Major
> Fix For: 1.14.0
>
>
> Now users can use 
> System.out.print/System.out.println/System.err.print/System.err.println/e.printStackTrace
>  to taskmanager.out and taskmanager.err as much as they want and this may use 
> large space of disk cause the disk problem and influence the checkpoint of 
> the flink and even the stability of the flink or other application on the 
> same node. I proposed that we can make the taskmanager.out and 
> taskmanager.err rollable just like taskmanager.log.By doing this, the disk 
> consume of the taskmanager.out and taskmanager.err can be control.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >