[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2024-06-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853908#comment-17853908
 ] 

Hang Ruan commented on FLINK-30400:
---

[~rmetzger] ,Thanks for the suggestion.

I will write this content in the part of docs/connectors/datastream/overview.

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: elasticsearch-3.1.0, aws-connector-4.2.0, kafka-3.1.0, 
> rabbitmq-3.1.0, kafka-3.0.2
>
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-06-05 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852619#comment-17852619
 ] 

Hang Ruan commented on FLINK-34445:
---

[~mason6345] Sorry for my late response.

I may not have time to deal with this issue recently. Is there anybody else 
from your side would like to take it?

Thanks ~

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35296) Flink mysql-cdc connector stops reading data

2024-05-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847211#comment-17847211
 ] 

Hang Ruan commented on FLINK-35296:
---

Hi, [~清月] .

What is the configuration of this mysql table and could you please provide the 
full logs of TM and JM?

I could not find the problem from the information by now. Have you ever tried 
to reproduce this problem with a test case?

Thanks ~

> Flink mysql-cdc connector stops reading data
> 
>
> Key: FLINK-35296
> URL: https://issues.apache.org/jira/browse/FLINK-35296
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Gang Yang
>Priority: Major
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-05-06-17-42-19-059.png, 
> image-2024-05-14-11-25-55-565.png
>
>
> *Background:*
> Consume sub-database and sub-table data through regular expressions, 
> scan.startup.mode=initial
> *Problems:*
> 1. The task occurs during the snapshot data synchronization phase;
> 2. After the task runs normally for a period of time, no more data will be 
> read. In fact, there is still a lot of data in the upstream Mysql table;
> 3. When the task is restarted from the state, it will read normally for a 
> period of time and then stop reading.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846129#comment-17846129
 ] 

Hang Ruan commented on FLINK-35109:
---

[~fpaul] Thanks for your quick reply. I will work on it later.

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Hang Ruan
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-10 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845529#comment-17845529
 ] 

Hang Ruan edited comment on FLINK-35109 at 5/11/24 3:26 AM:


I would like to help it. 


was (Author: ruanhang1993):
I would like to help. Please assign this to me, thanks~

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Fabian Paul
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35109) Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support for 1.17 and 1.18

2024-05-10 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845529#comment-17845529
 ] 

Hang Ruan commented on FLINK-35109:
---

I would like to help. Please assign this to me, thanks~

> Add support for Flink 1.20-SNAPSHOT in Flink Kafka connector and drop support 
> for 1.17 and 1.18
> ---
>
> Key: FLINK-35109
> URL: https://issues.apache.org/jira/browse/FLINK-35109
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Fabian Paul
>Priority: Blocker
> Fix For: kafka-4.0.0
>
>
> The Flink Kafka connector currently can't compile against Flink 
> 1.20-SNAPSHOT. An example failure can be found at 
> https://github.com/apache/flink-connector-kafka/actions/runs/8659822490/job/23746484721#step:15:169
> The {code:java} TypeSerializerUpgradeTestBase{code} has had issues before, 
> see FLINK-32455. See also specifically the comment in 
> https://issues.apache.org/jira/browse/FLINK-32455?focusedCommentId=17739785=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17739785
> Next to that, there's also FLINK-25509 which can only be supported with Flink 
> 1.19 and higher. 
> So we should:
> * Drop support for 1.17 and 1.18
> * Refactor the Flink Kafka connector to use the new 
> {code:java}MigrationTest{code}
> We will support the Flink Kafka connector for Flink 1.18 via the v3.1 branch; 
> this change will be a new v4.0 version with support for Flink 1.19 and the 
> upcoming Flink 1.20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-03-26 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831175#comment-17831175
 ] 

Hang Ruan commented on FLINK-34445:
---

Fine, I will take a look at this. Thanks ~

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34445) Integrate new endpoint with Flink UI metrics section

2024-03-26 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831119#comment-17831119
 ] 

Hang Ruan commented on FLINK-34445:
---

Sure, I would like to help in this part. When should I finish this feature?

> Integrate new endpoint with Flink UI metrics section
> 
>
> Key: FLINK-34445
> URL: https://issues.apache.org/jira/browse/FLINK-34445
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mason Chen
>Assignee: Mason Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34683) "Developer Guide - Contribute to Flink CDC" Page for Flink CDC Documentation

2024-03-15 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827390#comment-17827390
 ] 

Hang Ruan commented on FLINK-34683:
---

I would like to help on this issue. Please assign this to me. Thanks~ [~renqs] 

> "Developer Guide - Contribute to Flink CDC" Page for Flink CDC Documentation
> 
>
> Key: FLINK-34683
> URL: https://issues.apache.org/jira/browse/FLINK-34683
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: cdc-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34684) "Developer Guide - Licenses" Page for Flink CDC Documentation

2024-03-14 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17827329#comment-17827329
 ] 

Hang Ruan commented on FLINK-34684:
---

I would like to help on this issue. Please assign this to me. Thanks~ [~renqs] 

> "Developer Guide - Licenses" Page for Flink CDC Documentation
> -
>
> Key: FLINK-34684
> URL: https://issues.apache.org/jira/browse/FLINK-34684
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: cdc-3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34586) Update the README in Flink CDC

2024-03-06 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-34586:
-

 Summary: Update the README in Flink CDC
 Key: FLINK-34586
 URL: https://issues.apache.org/jira/browse/FLINK-34586
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hang Ruan


We should update the README file in Flink CDC.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34585) [JUnit5 Migration] Module: Flink CDC

2024-03-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-34585:
-

 Summary: [JUnit5 Migration] Module: Flink CDC
 Key: FLINK-34585
 URL: https://issues.apache.org/jira/browse/FLINK-34585
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Hang Ruan


Most tests in Flink CDC are still using Junit 4. We need to use Junit 5 instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34584) Change package name to org.apache.flink.cdc

2024-03-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-34584:
-

 Summary: Change package name to org.apache.flink.cdc
 Key: FLINK-34584
 URL: https://issues.apache.org/jira/browse/FLINK-34584
 Project: Flink
  Issue Type: Sub-task
  Components: Flink CDC
Reporter: Hang Ruan


Flink CDC need to change its package name to org.apache.flink.cdc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798000#comment-17798000
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~martijnvisser] & [~lindong] .

The Flink 1.18 has been released and I think we could push this feature now. 

Which kafka connector version should we put it into? This feature relies on the 
interface in 1.18 and is not compatible in 1.17 or earlier.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"

2023-12-15 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797117#comment-17797117
 ] 

Hang Ruan commented on FLINK-31615:
---

[~Wencong Liu] Sorry for the late reply. 

I have raised a PR to fix this part. Maybe you could help to review 
it(https://github.com/apache/flink/pull/22272/files). Thanks.

> Fix some parts forgot to translate in "Table API" page of "Table API & SQL" 
> 
>
> Key: FLINK-31615
> URL: https://issues.apache.org/jira/browse/FLINK-31615
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Hang Ruan
>Priority: Not a Priority
>  Labels: auto-deprioritized-minor, chinese-translation
>
> The query_state_warning in "Table API" page of "Table API & SQL"  is still in 
> English. And some comments in codes are in English.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32862) Support INIT operation type to be compatible with DTS on Alibaba Cloud

2023-12-14 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan closed FLINK-32862.
-
Resolution: Won't Fix

> Support INIT operation type to be compatible with DTS on Alibaba Cloud
> --
>
> Key: FLINK-32862
> URL: https://issues.apache.org/jira/browse/FLINK-32862
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Hang Ruan
>Assignee: Hang Ruan
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
>
> The operation type of canal json messages from DTS on Alibaba Cloud may 
> contain a new type `INIT`. We cannot handle these messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33630) CoordinationResponse should be wrapped by SerializedValue in TaskOperatorEventGateway and JobMasterOperatorEventGateway

2023-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789146#comment-17789146
 ] 

Hang Ruan commented on FLINK-33630:
---

Hi, all. 

I would like to help to resolve this problem. Please assign this issue to me. 
Thanks.

> CoordinationResponse should be wrapped by SerializedValue in 
> TaskOperatorEventGateway and JobMasterOperatorEventGateway
> ---
>
> Key: FLINK-33630
> URL: https://issues.apache.org/jira/browse/FLINK-33630
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Qingsheng Ren
>Priority: Major
>
> FLINK-26077 introduced a two-way RPC between operator and coordinator, but 
> {{CoordinationResponse}} is not wrapped by {{{}SerializedValue{}}}:
>  
> [https://github.com/apache/flink/blob/c61c09e464073fae430cab2dd56bd608f9d275fd/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/a.java#L254-L255|https://github.com/apache/flink/blob/34620fc5c5698d00e64c6b15f8ce84f807a2e0d7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterOperatorEventGateway.java#L54]
>  
> This might be a problem if the implementation of {{CoordinationResponse}} is 
> provided in user code and loaded by user code classloader, because Pekko RPC 
> handler always uses app classloader for serializing and deserializing RPC 
> parameters and return values, which will lead to {{ClassNotFoundException}} 
> in this case. Similar to what we do for the request, we need to wrap a 
> {{SerializesValue}} around the response to make sure RPC calls won't cause 
> {{{}ClassNotFoundException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33013) Shade flink-connector-base info flink-sql-connector-connector

2023-09-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764511#comment-17764511
 ] 

Hang Ruan commented on FLINK-33013:
---

Hi,all. I think we should reject this issue and keep the provided scope.

And I will check all connectors about this part in FLINK-30400.

> Shade flink-connector-base info flink-sql-connector-connector
> -
>
> Key: FLINK-33013
> URL: https://issues.apache.org/jira/browse/FLINK-33013
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / MongoDB
>Affects Versions: mongodb-1.0.2
>Reporter: Jiabao Sun
>Priority: Major
>  Labels: pull-request-available
>
> Shade flink-connector-base info flink-sql-connector-connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30400) Stop bundling connector-base in externalized connectors

2023-09-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17764510#comment-17764510
 ] 

Hang Ruan commented on FLINK-30400:
---

Hi, all. I would like to help to check. Thanks.

> Stop bundling connector-base in externalized connectors
> ---
>
> Key: FLINK-30400
> URL: https://issues.apache.org/jira/browse/FLINK-30400
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Reporter: Chesnay Schepler
>Priority: Major
>
> Check that none of the externalized connectors bundle connector-base; if so 
> remove the bundling and schedule a new minor release.
> Bundling this module is highly problematic w.r.t. binary compatibility, since 
> bundled classes may rely on internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-30 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17760670#comment-17760670
 ] 

Hang Ruan commented on FLINK-32798:
---

[~renqs] [~zjureel] , I think we could complete this testing task. Thanks.

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-24 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758804#comment-17758804
 ] 

Hang Ruan commented on FLINK-32798:
---

Hi, all. [~renqs] [~zjureel] 

I have modify the code to test using the configuration.
{code:java}
package org.self.listener;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {

public static final ConfigOption URL =
ConfigOptions.key("my.catalog.listener.url")
.stringType()
.noDefaultValue()
.withDescription(
"JDBC url of the MySQL database to use when 
connecting to the MySQL database server.");

public static final ConfigOption USERNAME =
ConfigOptions.key("my.catalog.listener.username")
.stringType()
.noDefaultValue()
.withDescription(
"Username of the MySQL database to use when 
connecting to the MySQL database server.");

public static final ConfigOption PASSWORD =
ConfigOptions.key("my.catalog.listener.password")
.stringType()
.noDefaultValue()
.withDescription(
"Password to use when connecting to the MySQL 
database server.");
@Override
public CatalogModificationListener createListener(Context context) {
ReadableConfig config = context.getConfiguration();
return new MyCatalogListener(config.get(URL), config.get(USERNAME), 
config.get(PASSWORD));
}

@Override
public Set> requiredOptions() {
Set> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(USERNAME);
requiredOptions.add(PASSWORD);
return requiredOptions;
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
And add the parameters in `flink-conf.yaml`.
{code:java}
table.catalog-modification.listeners: test
my.catalog.listener.url: 
jdbc:mysql://hostname:3306/db?useSSL=false=3
my.catalog.listener.username: username
my.catalog.listener.password: password{code}
It works well.

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-24 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758339#comment-17758339
 ] 

Hang Ruan edited comment on FLINK-32798 at 8/24/23 8:54 AM:


Hi, all.[~renqs] [~zjureel] 

I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false=3",
 "username", "password");
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;

public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());

try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" 

[jira] [Comment Edited] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758339#comment-17758339
 ] 

Hang Ruan edited comment on FLINK-32798 at 8/24/23 4:12 AM:


Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false=3",
 "username", "password");
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;

public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());

try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + 

[jira] [Comment Edited] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758339#comment-17758339
 ] 

Hang Ruan edited comment on FLINK-32798 at 8/24/23 4:00 AM:


Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false=3",
 "username", "password");
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;

public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());

try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + 

[jira] [Comment Edited] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758339#comment-17758339
 ] 

Hang Ruan edited comment on FLINK-32798 at 8/24/23 3:57 AM:


Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false=3",
 "username", "password");
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;

public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());

try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + 

[jira] [Commented] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17758339#comment-17758339
 ] 

Hang Ruan commented on FLINK-32798:
---

Hi, all. I have done some tests about this feature.

1.Create a mysql table to store the changes.

 
{code:java}
CREATE TABLE `listener_test` (  `id` bigint NOT NULL AUTO_INCREMENT,  `catalog` 
varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,  
`identifier` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `type` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT 
NULL,  `detail` text CHARACTER SET utf8 COLLATE utf8_general_ci,  PRIMARY KEY 
(`id`)) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb3 
ROW_FORMAT=DYNAMIC; {code}
2.Develop the Factory and Listener.

 

 
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import 
org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory;

public class MyCatalogListenerFactory implements 
CatalogModificationListenerFactory {
@Override
public CatalogModificationListener createListener(Context context) {
return new 
MyCatalogListener("jdbc:mysql://hostname:3306/db?useSSL=false=3",
 "username", "password");
}

@Override
public String factoryIdentifier() {
return "test";
}
} {code}
{code:java}
package org.self.listener;

import org.apache.flink.table.catalog.listener.AlterDatabaseEvent;
import org.apache.flink.table.catalog.listener.AlterTableEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationEvent;
import org.apache.flink.table.catalog.listener.CatalogModificationListener;
import org.apache.flink.table.catalog.listener.CreateDatabaseEvent;
import org.apache.flink.table.catalog.listener.CreateTableEvent;
import org.apache.flink.table.catalog.listener.DropDatabaseEvent;
import org.apache.flink.table.catalog.listener.DropTableEvent;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.stream.Collectors;

public class MyCatalogListener implements CatalogModificationListener {
private final String jdbcUrl;
private final String username;
private final String password;

public MyCatalogListener(String jdbcUrl, String username, String password) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void onEvent(CatalogModificationEvent event) {
try {
Class.forName(getDriverClassName());

try (Connection connection = DriverManager.getConnection(jdbcUrl, 
username, password);
 PreparedStatement statement = 
connection.prepareStatement("INSERT INTO `listener_test` 
(`catalog`,`identifier`,`type`,`detail`) VALUES (?,?,?,?);")) {
String identifier;
String type;
String detail;
String catalog = event.context().getCatalogName();
if (event instanceof CreateDatabaseEvent) {
CreateDatabaseEvent cde = (CreateDatabaseEvent) event;
identifier = "DB:" + cde.databaseName();
detail = 
cde.database().getProperties().entrySet().stream().map(e -> e.getKey() + ":" + 
e.getValue()).collect(Collectors.joining(", "));
type = "CREATE DB";
} else if (event instanceof AlterDatabaseEvent) {
AlterDatabaseEvent ade = (AlterDatabaseEvent) event;
identifier = "DB:" + ade.databaseName();
detail = 
ade.newDatabase().getProperties().entrySet().stream().map(e -> e.getKey() + ":" 
+ e.getValue()).collect(Collectors.joining(", "));
type = "ALTER DB";
} else if (event instanceof DropDatabaseEvent) {
DropDatabaseEvent dde = (DropDatabaseEvent) event;
identifier = "DB:" + dde.databaseName();
detail = "null";
type = "DELETE DB";
} else if (event instanceof CreateTableEvent) {
CreateTableEvent cte = (CreateTableEvent) event;
identifier = "TBL:" + cte.identifier().toString();
detail = cte.table().toString();
type = "CREATE TBL";
} else if (event instanceof AlterTableEvent) {
AlterTableEvent ate = (AlterTableEvent) event;
identifier = "TBL:" + ate.identifier().toString();
detail = ate.newTable().toString();
type = "ALTER TBL";
} else if (event instanceof DropTableEvent) {
DropTableEvent dte = (DropTableEvent) event;
identifier = "TBL:" + dte.identifier().toString();
detail = 

[jira] [Updated] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32798:
--
Attachment: result.png

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32798:
--
Attachment: sqls.png

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: result.png, sqls.png, test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757995#comment-17757995
 ] 

Hang Ruan commented on FLINK-32798:
---

[~zjureel] Hi, where should users put their jar? I do not find the content 
about it in docs. 

I try this with the sql client and use SET sql.

It seems like I can set it with any random string and sql client will not check 
it. Is it an expected behavior?

!test.png!

 

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32798) Release Testing: Verify FLIP-294: Support Customized Catalog Modification Listener

2023-08-23 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32798:
--
Attachment: test.png

> Release Testing: Verify FLIP-294: Support Customized Catalog Modification 
> Listener
> --
>
> Key: FLINK-32798
> URL: https://issues.apache.org/jira/browse/FLINK-32798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: test.png
>
>
> The document about catalog modification listener is: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/#catalog-modification-listener



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32799) Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner

2023-08-22 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17757791#comment-17757791
 ] 

Hang Ruan commented on FLINK-32799:
---

I have also tested one insert and select. It looks good.

> Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner
> -
>
> Key: FLINK-32799
> URL: https://issues.apache.org/jira/browse/FLINK-32799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: hive.png, lib.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32799) Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner

2023-08-21 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32799:
--
Attachment: lib.png

> Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner
> -
>
> Key: FLINK-32799
> URL: https://issues.apache.org/jira/browse/FLINK-32799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: hive.png, lib.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32799) Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner

2023-08-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756874#comment-17756874
 ] 

Hang Ruan commented on FLINK-32799:
---

Hi, all.

I test this with the hive alter sql. It seems good.

!hive.png!

My dependencies are as follows.

!lib.png!

> Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner
> -
>
> Key: FLINK-32799
> URL: https://issues.apache.org/jira/browse/FLINK-32799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: hive.png, lib.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32799) Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner

2023-08-21 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32799:
--
Attachment: hive.png

> Release Testing: Verify FLINK-26603 -Decouple Hive with Flink planner
> -
>
> Key: FLINK-32799
> URL: https://issues.apache.org/jira/browse/FLINK-32799
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: hive.png, lib.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29039) RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row

2023-08-14 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17754376#comment-17754376
 ] 

Hang Ruan commented on FLINK-29039:
---

This bug is the same as https://issues.apache.org/jira/browse/FLINK-25132.

We cannot get the right result when the deserializer reuses the object and the 
connector put deserialized records in a collection. This will cause the 
collection contains the same object.

> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value
> 
>
> Key: FLINK-29039
> URL: https://issues.apache.org/jira/browse/FLINK-29039
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.1
> Environment: This issue was discovered on MacOS Big Sur.
>Reporter: Marco A. Villalobos
>Assignee: Hang Ruan
>Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value.
>  
> Given this program:
> {code:java}
> package mvillalobos.bug;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class IsThisABatchSQLBug {  
>public static void main(String[] args) {
>      final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>      final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
>            "        `file.path`              STRING NOT NULL METADATA,\n" +
>            "        `file.name`              STRING NOT NULL METADATA,\n" +
>            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
>            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
> METADATA,\n" +
>            "        line                    STRING\n" +
>            "      ) WITH (\n" +
>            "        'connector' = 'filesystem', \n" +
>            "        'format' = 'raw'\n" +
>            "      );");
>      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
>            "      WITH (\n" +
>            "        'path' = 
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
>            "      ) LIKE historical_raw_source_template;");     final 
> TableResult output = 
> tableEnv.from("historical_raw_source").select($("line")).execute();
>      output.print();
>   }
> } {code}
> and this sample.csv file in the 
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {code:java}
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten {code}
> {{The print results are:}}
> {code:java}
> +++
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> +++
> 10 rows in set {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32862) Support INIT operation type to be compatible with DTS on Alibaba Cloud

2023-08-14 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-32862:
-

 Summary: Support INIT operation type to be compatible with DTS on 
Alibaba Cloud
 Key: FLINK-32862
 URL: https://issues.apache.org/jira/browse/FLINK-32862
 Project: Flink
  Issue Type: Improvement
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Hang Ruan


The operation type of canal json messages from DTS on Alibaba Cloud may contain 
a new type `INIT`. We cannot handle these messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32752) Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client

2023-08-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753887#comment-17753887
 ] 

Hang Ruan commented on FLINK-32752:
---

Hi, [~Sergey Nuyanzin] .

I have raise a PR(https://github.com/apache/flink/pull/23205) to fix some 
missing parts for this feature in docs. Please take a look at it. Thanks.

 

> Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client
> -
>
> Key: FLINK-32752
> URL: https://issues.apache.org/jira/browse/FLINK-32752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: release-testing
> Attachments: part1.png, part2.png
>
>
> After FLINK-24909 it is possible to specify syntax highlight color schema as 
> mentioned in doc via 
> {{sql-client.display.color-schema}} config option
> {code:sql}
> SET 'sql-client.display.color-schema' = ...
> {code}
> Possible values are  {{chester}}, {{dracula}}, {{solarized}}, {{vs2010}}, 
> {{obsidian}}, {{geshi}}, {{default}}.
> It allows to highlight keywords, quoted text, sql identifiers quoted text 
> (ticks for default dialect and double quotes for Hive), comments (both 
> one-line and block comments), hints



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32752) Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client

2023-08-13 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32752:
--
Attachment: part1.png
part2.png

> Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client
> -
>
> Key: FLINK-32752
> URL: https://issues.apache.org/jira/browse/FLINK-32752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: release-testing
> Attachments: part1.png, part2.png
>
>
> After FLINK-24909 it is possible to specify syntax highlight color schema as 
> mentioned in doc via 
> {{sql-client.display.color-schema}} config option
> {code:sql}
> SET 'sql-client.display.color-schema' = ...
> {code}
> Possible values are  {{chester}}, {{dracula}}, {{solarized}}, {{vs2010}}, 
> {{obsidian}}, {{geshi}}, {{default}}.
> It allows to highlight keywords, quoted text, sql identifiers quoted text 
> (ticks for default dialect and double quotes for Hive), comments (both 
> one-line and block comments), hints



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32752) Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client

2023-08-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753884#comment-17753884
 ] 

Hang Ruan commented on FLINK-32752:
---

Hi, all.

I have tested this feature and it looks good.

!part1.png!!part2.png!

> Release Testing: Verify FLINK-24909 SQL syntax highlighting in SQL Client
> -
>
> Key: FLINK-32752
> URL: https://issues.apache.org/jira/browse/FLINK-32752
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: release-testing
> Attachments: part1.png, part2.png
>
>
> After FLINK-24909 it is possible to specify syntax highlight color schema as 
> mentioned in doc via 
> {{sql-client.display.color-schema}} config option
> {code:sql}
> SET 'sql-client.display.color-schema' = ...
> {code}
> Possible values are  {{chester}}, {{dracula}}, {{solarized}}, {{vs2010}}, 
> {{obsidian}}, {{geshi}}, {{default}}.
> It allows to highlight keywords, quoted text, sql identifiers quoted text 
> (ticks for default dialect and double quotes for Hive), comments (both 
> one-line and block comments), hints



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32786) Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch mode

2023-08-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753875#comment-17753875
 ] 

Hang Ruan edited comment on FLINK-32786 at 8/14/23 3:04 AM:


I have implemented the interface in JDBC connector for MySQL. See my 
commit([https://github.com/ruanhang1993/flink-connector-jdbc/commit/06470ce4f0c899b9369bb40ee9bb8f410f8b4db2]).

Before truncate:

!before.png!

After truncate:

!after.png!

The test sql:

!truncate.png!

[~luoyuxia] All tests passed.


was (Author: ruanhang1993):
I have implemented the interface in JDBC connector for MySQL. See [my 
commit|[https://github.com/ruanhang1993/flink-connector-jdbc/commit/06470ce4f0c899b9369bb40ee9bb8f410f8b4db2]].

Before truncate:

!before.png!

After truncate:

!after.png!

The test sql:

!truncate.png!

[~luoyuxia] All tests passed.

> Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch 
> mode
> 
>
> Key: FLINK-32786
> URL: https://issues.apache.org/jira/browse/FLINK-32786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: after.png, before.png, truncate.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32786) Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch mode

2023-08-13 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32786:
--
Attachment: after.png
before.png
truncate.png

> Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch 
> mode
> 
>
> Key: FLINK-32786
> URL: https://issues.apache.org/jira/browse/FLINK-32786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: after.png, before.png, truncate.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32786) Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch mode

2023-08-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753875#comment-17753875
 ] 

Hang Ruan commented on FLINK-32786:
---

I have implemented the interface in JDBC connector for MySQL. See [my 
commit|[https://github.com/ruanhang1993/flink-connector-jdbc/commit/06470ce4f0c899b9369bb40ee9bb8f410f8b4db2]].

Before truncate:

!before.png!

After truncate:

!after.png!

The test sql:

!truncate.png!

[~luoyuxia] All tests passed.

> Release Testing: Verify FLIP-302: Support TRUNCATE TABLE statement in batch 
> mode
> 
>
> Key: FLINK-32786
> URL: https://issues.apache.org/jira/browse/FLINK-32786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753125#comment-17753125
 ] 

Hang Ruan commented on FLINK-32792:
---

`drop partition` :
 * drop single partition succeed.
 * drop two partitions failed. There are two places to consider:
 ** The first partition is actually dropped.
 ** These partitions can be dropped one by one. Not sure whether there are some 
problems in my tests.

!drop.png|width=879,height=500!

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: add.png, drop.png, show.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32792:
--
Attachment: drop.png

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: add.png, drop.png, show.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753119#comment-17753119
 ] 

Hang Ruan commented on FLINK-32792:
---

`show partitions` succeed.

!show.png|width=642,height=344!

'alter add partition' succeed.

!add.png|width=635,height=226!

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: add.png, show.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32792:
--
Attachment: show.png

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: add.png, show.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32792:
--
Attachment: add.png

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: add.png, show.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32792:
--
Attachment: show-partitions.png

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-32792:
--
Attachment: (was: show-partitions.png)

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32792 ]


Hang Ruan deleted comment on FLINK-32792:
---

was (Author: ruanhang1993):
!show-partitions.png|width=469,height=542! `Show partitions` succeed. 

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32792) Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement

2023-08-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753093#comment-17753093
 ] 

Hang Ruan commented on FLINK-32792:
---

!show-partitions.png|width=469,height=542! `Show partitions` succeed. 

> Release Testing: Verify FLINK-27237 - Partitioned table statement enhancement
> -
>
> Key: FLINK-32792
> URL: https://issues.apache.org/jira/browse/FLINK-32792
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32754) Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE

2023-08-06 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751437#comment-17751437
 ] 

Hang Ruan commented on FLINK-32754:
---

[~Yu Chen] [~yunta] ,Thanks for the issue.

I have opened a duplicate issue FLINK-31268 about it. And I have raised a 
[PR|https://github.com/apache/flink/pull/22048] to fix.

> Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE
> --
>
> Key: FLINK-32754
> URL: https://issues.apache.org/jira/browse/FLINK-32754
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.17.1
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-08-04-18-28-05-897.png
>
>
> We registered some metrics in the `enumerator` of the flip-27 source via 
> `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in 
> JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
> {*}Meanwhile, the task does not experience failover, and the Checkpoints 
> cannot be successfully created even after the task is in running state{*}.
> We found that the implementation class of `SplitEnumerator` is 
> `LazyInitializedCoordinatorContext`, however, the metricGroup() is 
> initialized after calling lazyInitialize(). By reviewing the code, we found 
> that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() 
> has not been called yet, so NPE is thrown.
> *Q: Why does this bug prevent the task from creating the Checkpoint?*
> `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
> member variable `enumerator` in `SourceCoordinator` being null. 
> Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called 
> via `runInEventLoop()`.
> In `runInEventLoop()`, if the enumerator is null, it will return directly.
> *Q: Why this bug doesn't trigger a task failover?*
> In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
> `internalCoordinator.resetToCheckpoint` throws an exception, then it will 
> catch the exception and call `cleanAndFailJob ` to try to fail the job.
> However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
> while `schedulerExecutor.execute` will ignore the NPE triggered by 
> `globalFailureHandler.handleGlobalFailure(e)`.
> Thus it appears that the task did not failover.
> !image-2023-08-04-18-28-05-897.png|width=963,height=443!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32287) Add doc for truncate table statement

2023-07-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745524#comment-17745524
 ] 

Hang Ruan commented on FLINK-32287:
---

I would like to help. Please assign this to me. Thanks.

> Add doc for truncate table statement
> 
>
> Key: FLINK-32287
> URL: https://issues.apache.org/jira/browse/FLINK-32287
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29039) RowData produced by LineBytesInputFormat is reused, but DeserializationSchemaAdapter#Reader only shallow copies produced data, thus result will always be the last row

2023-07-18 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744421#comment-17744421
 ] 

Hang Ruan commented on FLINK-29039:
---

Hi, all.

I would like to help. Please assign this issue to me. Thanks.

> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value
> 
>
> Key: FLINK-29039
> URL: https://issues.apache.org/jira/browse/FLINK-29039
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.1
> Environment: This issue was discovered on MacOS Big Sur.
>Reporter: Marco A. Villalobos
>Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value.
>  
> Given this program:
> {code:java}
> package mvillalobos.bug;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class IsThisABatchSQLBug {  
>public static void main(String[] args) {
>      final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>      final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
>            "        `file.path`              STRING NOT NULL METADATA,\n" +
>            "        `file.name`              STRING NOT NULL METADATA,\n" +
>            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
>            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
> METADATA,\n" +
>            "        line                    STRING\n" +
>            "      ) WITH (\n" +
>            "        'connector' = 'filesystem', \n" +
>            "        'format' = 'raw'\n" +
>            "      );");
>      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
>            "      WITH (\n" +
>            "        'path' = 
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
>            "      ) LIKE historical_raw_source_template;");     final 
> TableResult output = 
> tableEnv.from("historical_raw_source").select($("line")).execute();
>      output.print();
>   }
> } {code}
> and this sample.csv file in the 
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {code:java}
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten {code}
> {{The print results are:}}
> {code:java}
> +++
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> | +I |                            ten |
> +++
> 10 rows in set {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32555) Add a page to show SIGMOD System Awards for Apache Flink

2023-07-06 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740865#comment-17740865
 ] 

Hang Ruan commented on FLINK-32555:
---

I'd like to help. Please assign this to me. Thanks.

> Add a page to show SIGMOD System Awards for Apache Flink
> 
>
> Key: FLINK-32555
> URL: https://issues.apache.org/jira/browse/FLINK-32555
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Leonard Xu
>Priority: Major
> Fix For: 1.18.0
>
>
> As you all known, Apache Flink has won the 2023 SIGMOD Systems Award [1].
> It will be helpful to promote Apache Flink if we can add a page community 
> document to let our users know.
> [1][https://sigmod.org/2023-sigmod-systems-award/]
> [2]https://spark.apache.org/news/sigmod-system-award.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32397) Add doc for add/drop/show partition

2023-06-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735266#comment-17735266
 ] 

Hang Ruan commented on FLINK-32397:
---

I would like to help. Please assign this to me. Thanks.

> Add doc for add/drop/show partition
> ---
>
> Key: FLINK-32397
> URL: https://issues.apache.org/jira/browse/FLINK-32397
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: luoyuxia
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31699) JDBC nightly CI failure

2023-04-04 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708640#comment-17708640
 ] 

Hang Ruan commented on FLINK-31699:
---

Thanks for suggestions. Actually I apply the changes in PR#31 and PR#30. This 
test error may still occur.

I will use the Martijn's fixed version to do more tests. Thanks ~

> JDBC nightly CI failure
> ---
>
> Key: FLINK-31699
> URL: https://issues.apache.org/jira/browse/FLINK-31699
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Danny Cranmer
>Assignee: Martijn Visser
>Priority: Major
>  Labels: pull-request-available
> Fix For: jdbc-3.1.0
>
>
> Investigate and fix the nightly CI failure. Example 
> [https://github.com/apache/flink-connector-jdbc/actions/runs/4585903259]
>  
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) 
> on project flink-connector-jdbc: Execution default-test of goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: 
> org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' 
> failed to discover tests: 
> com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(Ljava/lang/Class;)Ljava/lang/Object;
>  -> [Help 1]{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31699) JDBC nightly CI failure

2023-04-03 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707916#comment-17707916
 ] 

Hang Ruan commented on FLINK-31699:
---

Hi, Danny, I would like to help. Please assign this to me, thanks ~

> JDBC nightly CI failure
> ---
>
> Key: FLINK-31699
> URL: https://issues.apache.org/jira/browse/FLINK-31699
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Reporter: Danny Cranmer
>Priority: Major
>
> Investigate and fix the nightly CI failure. Example 
> [https://github.com/apache/flink-connector-jdbc/actions/runs/4585903259]
>  
> {code:java}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test (default-test) 
> on project flink-connector-jdbc: Execution default-test of goal 
> org.apache.maven.plugins:maven-surefire-plugin:3.0.0-M5:test failed: 
> org.junit.platform.commons.JUnitException: TestEngine with ID 'archunit' 
> failed to discover tests: 
> com.tngtech.archunit.lang.syntax.elements.MethodsThat.areAnnotatedWith(Ljava/lang/Class;)Ljava/lang/Object;
>  -> [Help 1]{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"

2023-03-26 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-31615:
--
Description: The query_state_warning in "Table API" page of "Table API & 
SQL"  is still in English. And some comments in codes are in English.  (was: 
The query_state_warning in )

> Fix some parts forgot to translate in "Table API" page of "Table API & SQL" 
> 
>
> Key: FLINK-31615
> URL: https://issues.apache.org/jira/browse/FLINK-31615
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Hang Ruan
>Priority: Minor
>  Labels: chinese-translation
>
> The query_state_warning in "Table API" page of "Table API & SQL"  is still in 
> English. And some comments in codes are in English.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"

2023-03-26 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-31615:
--
Description: The query_state_warning in 

> Fix some parts forgot to translate in "Table API" page of "Table API & SQL" 
> 
>
> Key: FLINK-31615
> URL: https://issues.apache.org/jira/browse/FLINK-31615
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Hang Ruan
>Priority: Minor
>  Labels: chinese-translation
>
> The query_state_warning in 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31615) Fix some parts forgot to translate in "Table API" page of "Table API & SQL"

2023-03-26 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-31615:
-

 Summary: Fix some parts forgot to translate in "Table API" page of 
"Table API & SQL" 
 Key: FLINK-31615
 URL: https://issues.apache.org/jira/browse/FLINK-31615
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31559) Update the flink version to 1.18-SNAPSHOT in flink-connector-kafka

2023-03-22 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703547#comment-17703547
 ] 

Hang Ruan commented on FLINK-31559:
---

[~martijnvisser] , WDYT?   Thanks~

> Update the flink version to 1.18-SNAPSHOT in flink-connector-kafka
> --
>
> Key: FLINK-31559
> URL: https://issues.apache.org/jira/browse/FLINK-31559
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Hang Ruan
>Priority: Minor
>
> For FLIP-208, there are some changes in flink-connector-base which the later 
> changes in  flink-connector-kafka depend on.
> So we need to update the version to 1.18-SNAPSHOT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31559) Update the flink version to 1.18-SNAPSHOT in flink-connector-kafka

2023-03-22 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-31559:
-

 Summary: Update the flink version to 1.18-SNAPSHOT in 
flink-connector-kafka
 Key: FLINK-31559
 URL: https://issues.apache.org/jira/browse/FLINK-31559
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Hang Ruan


For FLIP-208, there are some changes in flink-connector-base which the later 
changes in  flink-connector-kafka depend on.

So we need to update the version to 1.18-SNAPSHOT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703064#comment-17703064
 ] 

Hang Ruan edited comment on FLINK-31483 at 3/21/23 7:08 AM:


Hi, [~ruibin] , 

I found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/3wxfr39t2rz1wvxw2vsz5hsrp9t8qrwx]


was (Author: ruanhang1993):
Hi, [~ruibin] , 

I found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]

> Implement Split Deletion Support in Flink Kafka Connector
> -
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Connectors / Parent
>Reporter: Ruibin Xing
>Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is 
> left as a 
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
>  I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has 
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
> SplitsDeletion event to the source operator. To maintain compatibility, a 
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including 
> remove them from Split state and stopping SourceReader from reading the 
> deleted splits.
> As an alternative, without modifying the flink-connector-base, 
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
> splits deletion and deal with it in the kafka-connector-specific code. But I 
> think it's better to have SplitsDeletion in flink-connector-base, so other 
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703064#comment-17703064
 ] 

Hang Ruan edited comment on FLINK-31483 at 3/21/23 7:06 AM:


Hi, [~ruibin] , 

I found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]


was (Author: ruanhang1993):
Hi, [~ruibin] , 

I found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]

 [3] [https://lists.apache.org/thread/6cv801kp3r4y6tytf82p45zvoxfo3p07]

> Implement Split Deletion Support in Flink Kafka Connector
> -
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Connectors / Parent
>Reporter: Ruibin Xing
>Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is 
> left as a 
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
>  I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has 
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
> SplitsDeletion event to the source operator. To maintain compatibility, a 
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including 
> remove them from Split state and stopping SourceReader from reading the 
> deleted splits.
> As an alternative, without modifying the flink-connector-base, 
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
> splits deletion and deal with it in the kafka-connector-specific code. But I 
> think it's better to have SplitsDeletion in flink-connector-base, so other 
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703064#comment-17703064
 ] 

Hang Ruan edited comment on FLINK-31483 at 3/21/23 7:05 AM:


Hi, [~ruibin] , 

I found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]

 [3] [https://lists.apache.org/thread/6cv801kp3r4y6tytf82p45zvoxfo3p07]


was (Author: ruanhang1993):
Hi, [~ruibin] , 

I have found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]

 [3] [https://lists.apache.org/thread/6cv801kp3r4y6tytf82p45zvoxfo3p07]

> Implement Split Deletion Support in Flink Kafka Connector
> -
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Connectors / Parent
>Reporter: Ruibin Xing
>Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is 
> left as a 
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
>  I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has 
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
> SplitsDeletion event to the source operator. To maintain compatibility, a 
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including 
> remove them from Split state and stopping SourceReader from reading the 
> deleted splits.
> As an alternative, without modifying the flink-connector-base, 
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
> splits deletion and deal with it in the kafka-connector-specific code. But I 
> think it's better to have SplitsDeletion in flink-connector-base, so other 
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-21 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17703064#comment-17703064
 ] 

Hang Ruan commented on FLINK-31483:
---

Hi, [~ruibin] , 

I have found the mails about this part. Hope it is helpful.

 

 [1] [https://lists.apache.org/thread/7r4h7v5k281w9cnbfw9lb8tp56r30lwt]

 [2] [https://lists.apache.org/thread/4h3xl25zlys8wzhtlrrmlcpht62oglrm]

 [3] [https://lists.apache.org/thread/6cv801kp3r4y6tytf82p45zvoxfo3p07]

> Implement Split Deletion Support in Flink Kafka Connector
> -
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Connectors / Parent
>Reporter: Ruibin Xing
>Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is 
> left as a 
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
>  I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has 
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
> SplitsDeletion event to the source operator. To maintain compatibility, a 
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including 
> remove them from Split state and stopping SourceReader from reading the 
> deleted splits.
> As an alternative, without modifying the flink-connector-base, 
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
> splits deletion and deal with it in the kafka-connector-specific code. But I 
> think it's better to have SplitsDeletion in flink-connector-base, so other 
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31511) Translate documentation sql_functions_zh.yml to the latest version

2023-03-18 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-31511:
-

 Summary: Translate documentation sql_functions_zh.yml  to the 
latest version
 Key: FLINK-31511
 URL: https://issues.apache.org/jira/browse/FLINK-31511
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Hang Ruan


Some content of these functions in sql_functions_zh.yml is outdated.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31511) Translate documentation sql_functions_zh.yml to the latest version

2023-03-18 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702109#comment-17702109
 ] 

Hang Ruan commented on FLINK-31511:
---

Hi, all, 

I would like to help update this document as some content is out dated.

> Translate documentation sql_functions_zh.yml  to the latest version
> ---
>
> Key: FLINK-31511
> URL: https://issues.apache.org/jira/browse/FLINK-31511
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Hang Ruan
>Priority: Minor
>
> Some content of these functions in sql_functions_zh.yml is outdated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31483) Implement Split Deletion Support in Flink Kafka Connector

2023-03-16 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17701005#comment-17701005
 ] 

Hang Ruan commented on FLINK-31483:
---

Hi, Ruibin,

Actually a deletion event(SplitsRemoval) is planned to be added for the 
FLIP-208(https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records).
 Here is the PR([https://github.com/apache/flink/pull/21589).]

But this PR don't aim at adding the split deletion for the enumerator. The 
deletion event only is passed in the reader itself.

Maybe some part in this issue will be contained after this FLIP finished.

Best, Hang

> Implement Split Deletion Support in Flink Kafka Connector
> -
>
> Key: FLINK-31483
> URL: https://issues.apache.org/jira/browse/FLINK-31483
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Kafka, Connectors / Parent
>Reporter: Ruibin Xing
>Priority: Major
>
> Currently, the Flink Kafka Connector does not support split deletion and is 
> left as a 
> [TODO|[https://github.com/apache/flink-connector-kafka/blob/9f72be91f8abdfc9b5e8fa46d15dee3f83e71332/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L305]].
>  I want to add this feature by doing these steps:
> 1. Add SplitsDeletion event to flink-connector-base, which currently only has 
> SplitsAddition.
> 2. Add a `deleteSplits` method in SplitEnumeratorContext, so it can send a 
> SplitsDeletion event to the source operator. To maintain compatibility, a 
> default empty implementation for this method will be added.
> 3. Make SourceOperator handle the SplitsDeletion event, notifiying the 
> SourceReader to delete splits.
> 4. Create a deleteSplits method in SourceReader to remove splits, including 
> remove them from Split state and stopping SourceReader from reading the 
> deleted splits.
> As an alternative, without modifying the flink-connector-base, 
> KafkaSplitsEnumerator could send a custom SourceEvent to SourceOperator for 
> splits deletion and deal with it in the kafka-connector-specific code. But I 
> think it's better to have SplitsDeletion in flink-connector-base, so other 
> connectors can use it too.
> Let me know if you have any thoughts or ideas. Thanks!
> Related Issues: FLINK-30490



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30959) Improve the documentation of UNIX_TIMESTAMP for different argument format

2023-03-12 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17699319#comment-17699319
 ] 

Hang Ruan commented on FLINK-30959:
---

Hi, all,

I would like to help to improve the document. Please assign this to me if 
possible. Thanks~

> Improve the documentation of UNIX_TIMESTAMP for different argument format
> -
>
> Key: FLINK-30959
> URL: https://issues.apache.org/jira/browse/FLINK-30959
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table SQL / API
>Affects Versions: 1.16.1
>Reporter: Yunfeng Zhou
>Priority: Major
> Fix For: 1.17.0, 1.16.2
>
>
> When running the following pyflink program
>  
> {code:python}
> import pandas as pd
> from pyflink.datastream import StreamExecutionEnvironment, HashMapStateBackend
> from pyflink.table import StreamTableEnvironment
> if __name__ == "__main__":
> input_data = pd.DataFrame(
> [
> ["Alex", 100.0, "2022-01-01 08:00:00.001 +0800"],
> ["Emma", 400.0, "2022-01-01 00:00:00.003 +"],
> ["Alex", 200.0, "2022-01-01 08:00:00.005 +0800"],
> ["Emma", 300.0, "2022-01-01 00:00:00.007 +"],
> ["Jack", 500.0, "2022-01-01 08:00:00.009 +0800"],
> ["Alex", 450.0, "2022-01-01 00:00:00.011 +"],
> ],
> columns=["name", "avg_cost", "time"],
> )
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_state_backend(HashMapStateBackend())
> t_env = StreamTableEnvironment.create(env)
> input_table = t_env.from_pandas(input_data)
> t_env.create_temporary_view("input_table", input_table)
> time_format = "-MM-dd HH:mm:ss.SSS X"
> output_table = t_env.sql_query(
> f"SELECT *, UNIX_TIMESTAMP(`time`, '{time_format}') AS unix_time FROM 
> input_table"
> )
> output_table.execute().print()
> {code}
> The actual output is 
> {code}
> +++++--+
> | op |   name |   avg_cost |  
>  time |unix_time |
> +++++--+
> | +I |   Alex |  100.0 |  
> 2022-01-01 08:00:00.001 +0800 |   1640995200 |
> | +I |   Emma |  400.0 |  
> 2022-01-01 00:00:00.003 + |   1640995200 |
> | +I |   Alex |  200.0 |  
> 2022-01-01 08:00:00.005 +0800 |   1640995200 |
> | +I |   Emma |  300.0 |  
> 2022-01-01 00:00:00.007 + |   1640995200 |
> | +I |   Jack |  500.0 |  
> 2022-01-01 08:00:00.009 +0800 |   1640995200 |
> | +I |   Alex |  450.0 |  
> 2022-01-01 00:00:00.011 + |   1640995200 |
> +++++--+
> {code}
> While the expected result is
> {code:java}
> +++++--+
> | op |   name |   avg_cost |  
>  time |unix_time |
> +++++--+
> | +I |   Alex |  100.0 |  
> 2022-01-01 08:00:00.001 +0800 |   1640995200 |
> | +I |   Emma |  400.0 |  
> 2022-01-01 00:00:00.003 + |   1640966400 |
> | +I |   Alex |  200.0 |  
> 2022-01-01 08:00:00.005 +0800 |   1640995200 |
> | +I |   Emma |  300.0 |  
> 2022-01-01 00:00:00.007 + |   1640966400 |
> | +I |   Jack |  500.0 |  
> 2022-01-01 08:00:00.009 +0800 |   1640995200 |
> | +I |   Alex |  450.0 |  
> 2022-01-01 00:00:00.011 + |   1640966400 |
> +++++--+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31268) OperatorCoordinator.Context#metricGroup will return null when restore from a savepoint

2023-02-28 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-31268:
-

 Summary: OperatorCoordinator.Context#metricGroup will return null 
when restore from a savepoint
 Key: FLINK-31268
 URL: https://issues.apache.org/jira/browse/FLINK-31268
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: Hang Ruan


The `metricGroup` is initialized lazily in the method 
`OperatorCoordinatorHandler#initializeOperatorCoordinators`.

This will cause the NullPointerException when we use it in the method like 
`Source#restoreEnumerator`, which will be invoked through 
`SchedulerBase#createAndRestoreExecutionGraph` before 
`OperatorCoordinatorHandler#initializeOperatorCoordinators` in class 
`SchedulerBase#`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2022-12-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17650103#comment-17650103
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~lindong] ,

I have read the FLIP and I am willing to help to implement this feature. Would 
you mind to assign this ticket to me? 

Thanks~

 

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-30 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641728#comment-17641728
 ] 

Hang Ruan edited comment on FLINK-29801 at 12/1/22 7:50 AM:


Hi, [~stevenz3wu] ,

For question 1,  we want to provide some common metrics in the 
`OperatorCoordinatorMetricGroup` like other metric groups, such as 
`SinkWriterMetricGroup`. So we provide the `numEventsIn` for it.

For question 2, when invoking 
`SourceCoordinatorProvider#getCoordinator(OperatorCoordinator.Context 
context)`, we could get the `OperatorCoordinatorMetricGroup` by the method 
`metricGroup()` in `OperatorCoordinator.Context`. The detail will be contained 
in the other issue https://issues.apache.org/jira/browse/FLINK-21000 after this 
issue.

The discussion will be sent this week or later. Thanks for the reply.


was (Author: ruanhang1993):
Hi, [~stevenz3wu] ,

For question 1,  we want to provide some common metrics in the 
`OperatorCoordinatorMetricGroup` like other metric groups, such as 
`SinkWriterMetricGroup`. So we provide the `numEventsIn` for it.

For question 2, when invoking 
`SourceCoordinatorProvider#getCoordinator(OperatorCoordinator.Context 
context)`, we could get the `OperatorCoordinatorMetricGroup` by the method 
`metricGroup()` in `OperatorCoordinator.Context`.

The discussion will be sent this week or later. Thanks for the reply.

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-30 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641728#comment-17641728
 ] 

Hang Ruan commented on FLINK-29801:
---

Hi, [~stevenz3wu] ,

For question 1,  we want to provide some common metrics in the 
`OperatorCoordinatorMetricGroup` like other metric groups, such as 
`SinkWriterMetricGroup`. So we provide the `numEventsIn` for it.

For question 2, when invoking 
`SourceCoordinatorProvider#getCoordinator(OperatorCoordinator.Context 
context)`, we could get the `OperatorCoordinatorMetricGroup` by the method 
`metricGroup()` in `OperatorCoordinator.Context`.

The discussion will be sent this week or later. Thanks for the reply.

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-24 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638199#comment-17638199
 ] 

Hang Ruan commented on FLINK-29801:
---

Hi,[~MengYue] , what is your e-mail? I will send an e-mail to you. Thanks.

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638076#comment-17638076
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/24/22 3:53 AM:
-

Hi, [~MengYue] ,

I have created a 
FLIP([https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator])
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss through Slack or Dingding. Here is my 
e-mail(ruanhang1...@hotmail.com).

Thanks~


was (Author: ruanhang1993):
Hi, [~MengYue] ,

I have created a 
FLIP([https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator])
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss through Slack or Dingding. Here is my 
e-mail(ruanhang1...@hotmail.com).

Thanks~

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638076#comment-17638076
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/24/22 3:48 AM:
-

Hi, [~MengYue] ,

I have created a 
FLIP([https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator])
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss through Slack or Dingding. Here is my 
e-mail(ruanhang1...@hotmail.com)

Thanks~


was (Author: ruanhang1993):
Hi, [~MengYue] ,

I have created a 
FLIP(https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator)
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss at Slack or Dingding.

Thanks~

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638076#comment-17638076
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/24/22 3:48 AM:
-

Hi, [~MengYue] ,

I have created a 
FLIP([https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator])
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss through Slack or Dingding. Here is my 
e-mail(ruanhang1...@hotmail.com).

Thanks~


was (Author: ruanhang1993):
Hi, [~MengYue] ,

I have created a 
FLIP([https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator])
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss through Slack or Dingding. Here is my 
e-mail(ruanhang1...@hotmail.com)

Thanks~

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638076#comment-17638076
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/24/22 3:42 AM:
-

Hi, [~MengYue] ,

I have created a 
FLIP(https://cwiki.apache.org/confluence/display/FLINK/FLIP-274%3A+Introduce+metric+group+for+OperatorCoordinator)
 about this issue. Could you help to review this FLIP when you get time? Any 
comments are appreciated.

Maybe we could discuss at Slack or Dingding.

Thanks~


was (Author: ruanhang1993):
Hi, [~MengYue] ,

I have created a FLIP about this issue. Could you help to review this FLIP when 
you get time? Any comments are appreciated.

Maybe we could discuss at Slack or Dingding.

Thanks~

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-23 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17638076#comment-17638076
 ] 

Hang Ruan commented on FLINK-29801:
---

Hi, [~MengYue] ,

I have created a FLIP about this issue. Could you help to review this FLIP when 
you get time? Any comments are appreciated.

Maybe we could discuss at Slack or Dingding.

Thanks~

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636421#comment-17636421
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/21/22 3:52 AM:
-

Hi, [~MengYue] ,

For the question 1, what I mean is better to pass the metric group in the 
`OperatorCoordinator.Context` instead of using it by invoking the method 
`registeMetric` provided in the PR. 

For the question 2, it seems that the implementations of `OperatorCoordinator` 
need to distinguish the metric group name by self, which may need to be done by 
Flink.

IMO, I think we need a new metric group for the `OperatorCoordinator`. And we 
need the FLIP for the change and a discuss it in the community.

This needs to spend some time in the discussion it in the community.  


was (Author: ruanhang1993):
Hi, [~MengYue] ,

For the question 1, what I means is better to pass the metric group in the 
`OperatorCoordinator.Context` instead of using it by invoking the method 
`registeMetric` provided in the PR. 

For the question 2, it seems that the implementations of `OperatorCoordinator` 
need to distinguish the metric group name by self, which may need to be done by 
Flink.

IMO, I think we need a new metric group for the `OperatorCoordinator`. And we 
need the FLIP for the change and a discuss it in the community.

This needs to spend some time in the discussion it in the community.  

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-20 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17636421#comment-17636421
 ] 

Hang Ruan commented on FLINK-29801:
---

Hi, [~MengYue] ,

For the question 1, what I means is better to pass the metric group in the 
`OperatorCoordinator.Context` instead of using it by invoking the method 
`registeMetric` provided in the PR. 

For the question 2, it seems that the implementations of `OperatorCoordinator` 
need to distinguish the metric group name by self, which may need to be done by 
Flink.

IMO, I think we need a new metric group for the `OperatorCoordinator`. And we 
need the FLIP for the change and a discuss it in the community.

This needs to spend some time in the discussion it in the community.  

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635287#comment-17635287
 ] 

Hang Ruan edited comment on FLINK-29801 at 11/17/22 11:02 AM:
--

Hi, [~MengYue] ,

I may rely on this feature to accomplish 
https://issues.apache.org/jira/browse/FLINK-21000. In the feature 
https://issues.apache.org/jira/browse/FLINK-21000, I provide a POC which 
contains some parts about the metric group in OperatorCoordinator. 

With these research, I have some question about this PR:
 # How should we use the operator coordinator metric group when implement the 
OperatorCoordinator? For example, we always get the metric group from one 
context in the SourceCoordinator. Why not pass the metric group in its context?
 # Is there some problems to use the JobManagerJobMetricGroup? There may be 
serval operator coordinators in the same job. It seems that every operator 
coordinator should make sure that its metric group does not conflict with 
others. 

ps: It is my 
POC(https://github.com/ruanhang1993/flink/commit/dc41d3ab465d03e2829e5f1d10137ea34621ad87).
  I am very glad to help if you need.

Best,  Hang


was (Author: ruanhang1993):
Hi, [~MengYue] ,

I may rely on this feature to accomplish 
https://issues.apache.org/jira/browse/FLINK-21000. In the feature 
https://issues.apache.org/jira/browse/FLINK-21000, I provide a POC which 
contains some parts about the metric group in OperatorCoordinator. 

With these research, I have some question about this PR:
 # How should we use the operator coordinator metric group when implement the 
OperatorCoordinator? For example, we always get the metric group from one 
context in the SourceCoordinator. Why not pass the metric group in its context?
 # Is there some problems to use the JobManagerJobMetricGroup? There may be 
serval operator coordinators in the same job. It seems that every operator 
coordinator should make sure that its metric group does not conflict with 
others. 

Thanks,  Hang

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-21000) set MetricGroup in SourceCoordinatorContext

2022-11-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635288#comment-17635288
 ] 

Hang Ruan commented on FLINK-21000:
---

Hi, [~kbendick] ,

Sorry for my late response. I will continue to complete this issue.

There is a new issue that provides the metric group for the operator 
coordinator. I will participate in the discussion and continue to develop this 
issue after that issue is merged.

 

Best, Hang

> set MetricGroup in SourceCoordinatorContext
> ---
>
> Key: FLINK-21000
> URL: https://issues.apache.org/jira/browse/FLINK-21000
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Steven Zhen Wu
>Assignee: Hang Ruan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> stale-assigned
>
> Right now, SourceCoordinatorContext returns null. We need a valid metric 
> group to publish source metrics (e.g. number of pending splits) in source 
> enumerator.
> cc [~sewen] [~jqin]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-21000) set MetricGroup in SourceCoordinatorContext

2022-11-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635288#comment-17635288
 ] 

Hang Ruan edited comment on FLINK-21000 at 11/17/22 10:59 AM:
--

Hi, [~kbendick] ,

Sorry for my late response. I will continue to complete this issue.

There is a new issue that provides the metric group for the operator 
coordinator(https://issues.apache.org/jira/browse/FLINK-29801). I will 
participate in the discussion and continue to develop this issue after that 
issue is merged.

 

Best, Hang


was (Author: ruanhang1993):
Hi, [~kbendick] ,

Sorry for my late response. I will continue to complete this issue.

There is a new issue that provides the metric group for the operator 
coordinator. I will participate in the discussion and continue to develop this 
issue after that issue is merged.

 

Best, Hang

> set MetricGroup in SourceCoordinatorContext
> ---
>
> Key: FLINK-21000
> URL: https://issues.apache.org/jira/browse/FLINK-21000
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.12.0
>Reporter: Steven Zhen Wu
>Assignee: Hang Ruan
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> stale-assigned
>
> Right now, SourceCoordinatorContext returns null. We need a valid metric 
> group to publish source metrics (e.g. number of pending splits) in source 
> enumerator.
> cc [~sewen] [~jqin]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29801) OperatorCoordinator need open the way to operate metricGroup interface

2022-11-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635287#comment-17635287
 ] 

Hang Ruan commented on FLINK-29801:
---

Hi, [~MengYue] ,

I may rely on this feature to accomplish 
https://issues.apache.org/jira/browse/FLINK-21000. In the feature 
https://issues.apache.org/jira/browse/FLINK-21000, I provide a POC which 
contains some parts about the metric group in OperatorCoordinator. 

With these research, I have some question about this PR:
 # How should we use the operator coordinator metric group when implement the 
OperatorCoordinator? For example, we always get the metric group from one 
context in the SourceCoordinator. Why not pass the metric group in its context?
 # Is there some problems to use the JobManagerJobMetricGroup? There may be 
serval operator coordinators in the same job. It seems that every operator 
coordinator should make sure that its metric group does not conflict with 
others. 

Thanks,  Hang

> OperatorCoordinator need open the way to operate metricGroup interface
> --
>
> Key: FLINK-29801
> URL: https://issues.apache.org/jira/browse/FLINK-29801
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: yuemeng
>Assignee: yuemeng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, We have no way to get metric group instances in OperatorCoordinator
> In some cases, we may report some metric in OperatorCoordinator such as Flink 
> hudi integrate scene, some meta will send to operator coordinator to commit 
> to hdfs or hms
> but we also need to report some metrics in operator coordinator for monitor 
> purpose



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29492) Kafka exactly-once sink causes OutOfMemoryError

2022-11-06 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17629566#comment-17629566
 ] 

Hang Ruan commented on FLINK-29492:
---

[~fpaul] , sorry for my late response.

I think this bug should not be contained in the 1.15.3 release. I changed some 
methods in the PublicEvolving class.

> Kafka exactly-once sink causes OutOfMemoryError
> ---
>
> Key: FLINK-29492
> URL: https://issues.apache.org/jira/browse/FLINK-29492
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.2
>Reporter: Robert Metzger
>Assignee: Hang Ruan
>Priority: Critical
>
> My Kafka exactly-once sinks are periodically failing with a 
> {{OutOfMemoryError: Java heap space}}.
> This looks very similar to FLINK-28250. But I am running 1.15.2, which 
> contains a fix for FLINK-28250.
> Exception:
> {code:java}
> java.io.IOException: Could not perform checkpoint 2281 for operator 
> http_events[3]: Writer (1/1)#1.
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>   at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 2281 for operator http_events[3]: Writer (1/1)#1. Failure 
> reason: Checkpoint was declined.
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
>   at 
> 

[jira] [Commented] (FLINK-29492) Kafka exactly-once sink causes OutOfMemoryError

2022-10-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17616169#comment-17616169
 ] 

Hang Ruan commented on FLINK-29492:
---

I am interested in this bug. Maybe I could help to take a look at it.

> Kafka exactly-once sink causes OutOfMemoryError
> ---
>
> Key: FLINK-29492
> URL: https://issues.apache.org/jira/browse/FLINK-29492
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.2
>Reporter: Robert Metzger
>Priority: Critical
>
> My Kafka exactly-once sinks are periodically failing with a 
> {{OutOfMemoryError: Java heap space}}.
> This looks very similar to FLINK-28250. But I am running 1.15.2, which 
> contains a fix for FLINK-28250.
> Exception:
> {code:java}
> java.io.IOException: Could not perform checkpoint 2281 for operator 
> http_events[3]: Writer (1/1)#1.
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
>   at 
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
>   at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>   at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
> complete snapshot 2281 for operator http_events[3]: Writer (1/1)#1. Failure 
> reason: Checkpoint was declined.
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
>   at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
>   at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
>   at 
> 

[jira] [Commented] (FLINK-25545) [JUnit5 Migration] Module: flink-clients

2022-07-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565253#comment-17565253
 ] 

Hang Ruan commented on FLINK-25545:
---

[~RocMarshal] , thanks for your work. Please be free to take this ticket ~

> [JUnit5 Migration] Module: flink-clients
> 
>
> Key: FLINK-25545
> URL: https://issues.apache.org/jira/browse/FLINK-25545
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Hang Ruan
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25546) [JUnit5 Migration] Module: flink-connector-base

2022-07-11 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565252#comment-17565252
 ] 

Hang Ruan commented on FLINK-25546:
---

[~Sergey Nuyanzin] , Thanks for your work.

[~renqs], could you help to assign this ticket to [~Sergey Nuyanzin] ?

> [JUnit5 Migration] Module: flink-connector-base
> ---
>
> Key: FLINK-25546
> URL: https://issues.apache.org/jira/browse/FLINK-25546
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Hang Ruan
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28370) Add close method for KafkaRecordSerializationSchema

2022-07-04 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17562038#comment-17562038
 ] 

Hang Ruan commented on FLINK-28370:
---

I am interested in this ticket and I could help to resolve this issue.

> Add close method for KafkaRecordSerializationSchema
> ---
>
> Key: FLINK-28370
> URL: https://issues.apache.org/jira/browse/FLINK-28370
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.15.0, 1.14.5
>Reporter: Leonard Xu
>Priority: Major
>
> KafkaRecordSerializationSchema only offers open() method, we should offer 
> close() method to release the opened resources.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25315) Add new extensions and classes to help the JUnit5 migration

2022-06-29 Thread Hang Ruan (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Hang Ruan commented on  FLINK-25315  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
  Re: Add new extensions and classes to help the JUnit5 migration   
 

  
 
 
 
 

 
 Hi, Yun Tang. Sorry for my late reply. Recently I may not have enough time to work on this ticket. I am very glad that someone else can help. Please be free to take this ticket. Thanks ~  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (FLINK-24050) Support primary keys on metadata columns

2022-05-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537970#comment-17537970
 ] 

Hang Ruan commented on FLINK-24050:
---

cc [~martijnvisser]  [~twalthr] 

> Support primary keys on metadata columns
> 
>
> Key: FLINK-24050
> URL: https://issues.apache.org/jira/browse/FLINK-24050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Ingo Bürk
>Priority: Major
>
> Currently, primary keys are required to consist solely of physical columns. 
> However, there might be scenarios where the actual payload/records do not 
> contain a suitable primary key, but a unique identifier is available through 
> metadata. In this case it would make sense to define the primary key on such 
> a metadata column:
> {code:java}
> CREATE TABLE T (
>   uid STRING METADATA,
>   content STRING
>   PRIMARY KEY (uid) NOT ENFORCED
> ) WITH (…)
> {code}
> A simple example for this would be IMAP: there is nothing unique about any 
> single email as a record, but each email in a specific folder on an IMAP 
> server has a unique UID (I'm excluding some irrelevant technical details 
> here).
> See FLINK-24512 for another (probably better) use case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-24050) Support primary keys on metadata columns

2022-05-13 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536499#comment-17536499
 ] 

Hang Ruan commented on FLINK-24050:
---

I am interested in this issue. Maybe I can help to improve this part. But there 
are still some details to discuss.

I think there should be no limitation for the source table. But when the table 
is used as a sink table, we need to discuss how to deal with the metadata 
primary key, which may be virtual or not.
 * Virtual metadata: Virtual metadata can not be persisted in the target 
storage. It leads that the same metadata which is read from the same table may 
be different from the value when we write it.
 * Writable metadata(Not virtual): There should be no limitation for the 
writable metadata.

IMO, the strategy for the metadata primary key should be like this:
 * There should be no limitation for the source table to use a metadata as 
primary keys;
 * For sink table:
 ** Using virtual metadata as primary keys is meaningless. If virtual metadata 
is used in this way, we need to ignore this primary key and warn the user. 
 *** If the primary key only contains virtual metadata, just ignore this 
primary key.
 *** If the primary key contains virtual metadata and other columns, throw a 
validation exception.
 ** Using writable metadata as primary keys is allowed. The behavior when write 
these metadata to the target storage depends on the connector type.
 *** Take upsert-kafka tables as an example. The upsert-kafka tables will write 
primary keys to the key of the Kafka record. If upsert-kafka connector supports 
to use metadata as primay keys, whether the metadata is write to the key or not 
depends on the upsert-kafka connector's implementation.

> Support primary keys on metadata columns
> 
>
> Key: FLINK-24050
> URL: https://issues.apache.org/jira/browse/FLINK-24050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Ingo Bürk
>Priority: Major
>
> Currently, primary keys are required to consist solely of physical columns. 
> However, there might be scenarios where the actual payload/records do not 
> contain a suitable primary key, but a unique identifier is available through 
> metadata. In this case it would make sense to define the primary key on such 
> a metadata column:
> {code:java}
> CREATE TABLE T (
>   uid STRING METADATA,
>   content STRING
>   PRIMARY KEY (uid) NOT ENFORCED
> ) WITH (…)
> {code}
> A simple example for this would be IMAP: there is nothing unique about any 
> single email as a record, but each email in a specific folder on an IMAP 
> server has a unique UID (I'm excluding some irrelevant technical details 
> here).
> See FLINK-24512 for another (probably better) use case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-26177) PulsarSourceITCase.testScaleDown fails with timeout

2022-02-16 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493613#comment-17493613
 ] 

Hang Ruan commented on FLINK-26177:
---

LGTM.

The test case `testSavepoint` uses the same code as the `testScaleUp/Down`. It 
is weird. We will take a look later. thanks ~

> PulsarSourceITCase.testScaleDown fails with timeout
> ---
>
> Key: FLINK-26177
> URL: https://issues.apache.org/jira/browse/FLINK-26177
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.15.0
>Reporter: Matthias Pohl
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> We observed a [build 
> failure|https://dev.azure.com/mapohl/flink/_build/results?buildId=742=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d=26553]
>  caused by {{PulsarSourceITCase.testScaleDown}}:
> {code}
> Feb 15 20:56:02 [ERROR] Tests run: 16, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 431.023 s <<< FAILURE! - in 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase
> Feb 15 20:56:02 [ERROR] 
> org.apache.flink.connector.pulsar.source.PulsarSourceITCase.testScaleDown(TestEnvironment,
>  DataStreamSourceExternalContext, CheckpointingMode)[2]  Time elapsed: 
> 138.444 s  <<< FAILURE!
> Feb 15 20:56:02 java.lang.AssertionError: 
> Feb 15 20:56:02 
> Feb 15 20:56:02 Expecting
> Feb 15 20:56:02   
> Feb 15 20:56:02 to be completed within 2M.
> Feb 15 20:56:02 
> Feb 15 20:56:02 exception caught while trying to get the future result: 
> java.util.concurrent.TimeoutException
> Feb 15 20:56:02   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25290) Add table source and sink test suite in connector testing framework

2022-02-16 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493045#comment-17493045
 ] 

Hang Ruan edited comment on FLINK-25290 at 2/16/22, 8:00 AM:
-

Hi, [~syhily]

The feature ([FLINK-25289|https://issues.apache.org/jira/browse/FLINK-25289]) 
about the dataStream sink test suite has already been merged. Please take a 
look.

This ticket still have some details to discuss and will not be contained in the 
release 1.15.

The RP only contains some easy sql tests now, and it is not enough.


was (Author: ruanhang1993):
[~syhily] The feature about the dataStream sink test suite has already been 
merged. Please take a look.

This ticket still have some details to discuss and will not be contained in the 
release 1.15.

The RP only contains some easy sql tests now, and it is not enough.

> Add table source and sink test suite in connector testing framework
> ---
>
> Key: FLINK-25290
> URL: https://issues.apache.org/jira/browse/FLINK-25290
> Project: Flink
>  Issue Type: Improvement
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25290) Add table source and sink test suite in connector testing framework

2022-02-15 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493045#comment-17493045
 ] 

Hang Ruan edited comment on FLINK-25290 at 2/16/22, 7:57 AM:
-

[~syhily] The feature about the dataStream sink test suite has already been 
merged. Please take a look.

This ticket still have some details to discuss and will not be contained in the 
release 1.15.

The RP only contains some easy sql tests now, and it is not enough.


was (Author: ruanhang1993):
[~syhily] The feature about the dataStream sink test suite has already been 
merged. Please take a look.

This ticket still have some details to discuss and will not be contained in the 
release 1.15.

The RP only contains some easy sql test now, and it is not enough.

> Add table source and sink test suite in connector testing framework
> ---
>
> Key: FLINK-25290
> URL: https://issues.apache.org/jira/browse/FLINK-25290
> Project: Flink
>  Issue Type: Improvement
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25290) Add table source and sink test suite in connector testing framework

2022-02-15 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17493045#comment-17493045
 ] 

Hang Ruan commented on FLINK-25290:
---

[~syhily] The feature about the dataStream sink test suite has already been 
merged. Please take a look.

This ticket still have some details to discuss and will not be contained in the 
release 1.15.

The RP only contains some easy sql test now, and it is not enough.

> Add table source and sink test suite in connector testing framework
> ---
>
> Key: FLINK-25290
> URL: https://issues.apache.org/jira/browse/FLINK-25290
> Project: Flink
>  Issue Type: Improvement
>Reporter: Qingsheng Ren
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26126) Sink V2 will cause error numRecordsOut metric

2022-02-14 Thread Hang Ruan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hang Ruan updated FLINK-26126:
--
Component/s: Connectors / Common

> Sink V2 will cause error numRecordsOut metric
> -
>
> Key: FLINK-26126
> URL: https://issues.apache.org/jira/browse/FLINK-26126
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common, Connectors / Kafka
>Affects Versions: 1.15.0
>Reporter: Hang Ruan
>Priority: Blocker
>
> We found that the new sink v2 interface will have a wrong numRecordsOut 
> metric for the sink writers. We send a fixed number of records to the source, 
> but the numRecordsOut of the sink continues to increase by the time.
> The problem lies in the method `emitCommittables` in the class 
> `SinkWriterOperator`.  The field `output` in its parent class 
> `AbstractStreamOperator` uses the same counter object as the `KafkaWriter`. 
> It will cause the numRecordsOut increasing when doing the checkpoint. 
> I found this problem when we implement the metric test in the testframe, now 
> I disable this metric test in the 
> [PR|https://github.com/apache/flink/pull/18496] We could reopen this test 
> case after the fix.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


  1   2   >