[jira] [Commented] (FLINK-34508) Migrate S3-related ITCases and e2e tests to Minio

2024-04-17 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838471#comment-17838471
 ] 

Muhammet Orazov commented on FLINK-34508:
-

The 
[YarnFileStageTestS3ITCase|https://github.com/apache/flink/blob/master/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java#L131]
 test is skipped because of the assumption UUID.randomUUID() path does not 
exist.

> Migrate S3-related ITCases and e2e tests to Minio 
> --
>
> Key: FLINK-34508
> URL: https://issues.apache.org/jira/browse/FLINK-34508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System / CI
>Affects Versions: 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Assignee: Muhammet Orazov
>Priority: Major
>  Labels: github-actions, pull-request-available
>
> Anything that uses {{org.apache.flink.testutils.s3.S3TestCredentials}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite loop [flink-cdc]

2024-04-17 Thread via GitHub


yuxiqian commented on PR #3235:
URL: https://github.com/apache/flink-cdc/pull/3235#issuecomment-2063041386

   cc @lvyanquan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-17 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-35027:
--

Assignee: Yanfei Lei

> Implement checkpoint drain in AsyncExecutionController
> --
>
> Key: FLINK-35027
> URL: https://issues.apache.org/jira/browse/FLINK-35027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35026) Introduce async execution configurations

2024-04-17 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-35026:
--

Assignee: Yanfei Lei

> Introduce async execution configurations
> 
>
> Key: FLINK-35026
> URL: https://issues.apache.org/jira/browse/FLINK-35026
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]

2024-04-17 Thread via GitHub


flinkbot commented on PR #24676:
URL: https://github.com/apache/flink/pull/24676#issuecomment-2062993344

   
   ## CI report:
   
   * bc18939af1ffea146d6f2214d061cca4dc136ca6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35027:
---
Labels: pull-request-available  (was: )

> Implement checkpoint drain in AsyncExecutionController
> --
>
> Key: FLINK-35027
> URL: https://issues.apache.org/jira/browse/FLINK-35027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35027][runtime/checkpoint] Implement checkpoint drain in AsyncExecutionController [flink]

2024-04-17 Thread via GitHub


fredia opened a new pull request, #24676:
URL: https://github.com/apache/flink/pull/24676

   
   
   ## What is the purpose of the change
   
This PR implements checkpoint drain in `AsyncExecutionController`, and 
wires it to 
`AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2`. 
   
   ## Brief change log
   
 - Add `AsyncExecutionController#drainInflightRecords`
 - Override 
`AbstractAsyncStateStreamOperator/AbstractAsyncStateStreamOperatorV2#snapshotState()`.
 
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   - `AsyncExecutionControllerTest#testInFlightRecordControl()`
   - 
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes, checkpointing)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452
 ] 

Xin Gong edited comment on FLINK-35151 at 4/18/24 4:46 AM:
---

I get an idea to address this issue by setting 

"currentTaskRunning || queue.remainingCapacity() == 0" for 
BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL


was (Author: JIRAUSER292212):
I get an idea to address this issue by set 

"currentTaskRunning || queue.remainingCapacity() == 0" for 
BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452
 ] 

Xin Gong edited comment on FLINK-35151 at 4/18/24 4:44 AM:
---

I get an idea to address this issue by set 

"currentTaskRunning || queue.remainingCapacity() == 0" for 
BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL


was (Author: JIRAUSER292212):
I get an idea to address this issue by set 

currentTaskRunning || queue.remainingCapacity() == 0 for 
BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33460) Support property authentication connection.

2024-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33460:
---
Labels: pull-request-available  (was: )

> Support property authentication connection.
> ---
>
> Key: FLINK-33460
> URL: https://issues.apache.org/jira/browse/FLINK-33460
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: RocMarshal
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33460][Connector/JDBC] Support property authentication connection. [flink-connector-jdbc]

2024-04-17 Thread via GitHub


RocMarshal opened a new pull request, #115:
URL: https://github.com/apache/flink-connector-jdbc/pull/115

   - Support property authentication connection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33460) Support property authentication connection.

2024-04-17 Thread RocMarshal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RocMarshal updated FLINK-33460:
---
Summary: Support property authentication connection.  (was: Support more 
authentication connection types such as the secret.)

> Support property authentication connection.
> ---
>
> Key: FLINK-33460
> URL: https://issues.apache.org/jira/browse/FLINK-33460
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Reporter: RocMarshal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452
 ] 

Xin Gong edited comment on FLINK-35151 at 4/18/24 4:43 AM:
---

I get an idea to address this issue by set 

currentTaskRunning || queue.remainingCapacity() == 0 for 
BinlogSplitReader#pollSplitRecords. [~Leonard] PTAL


was (Author: JIRAUSER292212):
I get an idea to address this issue by set 

currentTaskRunning || queue.remainingCapacity() == 0 for 
BinlogSplitReader#pollSplitRecords.

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838452#comment-17838452
 ] 

Xin Gong commented on FLINK-35151:
--

I get an idea to address this issue by set 

currentTaskRunning || queue.remainingCapacity() == 0 for 
BinlogSplitReader#pollSplitRecords.

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Gong updated FLINK-35151:
-
Description: 
Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
full.

Reason is that producing binlog is too fast.  
MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
BinlogSplitReader#stopBinlogReadTask to set 

currentTaskRunning to be false after MysqSourceReader receives binlog split 
update event.

MySqlSplitReader#pollSplitRecords is executed and 

dataIt is null to execute closeBinlogReader when currentReader is 
BinlogSplitReader. closeBinlogReader will execute 
statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
Because BinaryLogClient#connectLock is not release  when 
MySqlStreamingChangeEventSource add element to full queue.

 

You can set StatefulTaskContext#queue to be 1 and run UT 
NewlyAddedTableITCase#testRemoveAndAddNewTable.

 

  was:
Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
full.

Reason is that producing binlog is too fast.  
MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
BinlogSplitReader#stopBinlogReadTask to set 

currentTaskRunning to be false after MysqSourceReader receives binlog split 
update event.

MySqlSplitReader#pollSplitRecords is executed and 

dataIt is null to execute closeBinlogReader when currentReader is 
BinlogSplitReader. closeBinlogReader will execute 
statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
Because BinaryLogClient#connectLock is not release  when 
MySqlStreamingChangeEventSource add element to full queue.

 


> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  
> You can set StatefulTaskContext#queue to be 1 and run UT 
> NewlyAddedTableITCase#testRemoveAndAddNewTable.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Gong updated FLINK-35151:
-
Description: 
Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
full.

Reason is that producing binlog is too fast.  
MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
BinlogSplitReader#stopBinlogReadTask to set 

currentTaskRunning to be false after MysqSourceReader receives binlog split 
update event.

MySqlSplitReader#pollSplitRecords is executed and 

dataIt is null to execute closeBinlogReader when currentReader is 
BinlogSplitReader. closeBinlogReader will execute 
statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
Because BinaryLogClient#connectLock is not release  when 
MySqlStreamingChangeEventSource add element to full queue.

 

  was:
Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
full.

 


> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)
Xin Gong created FLINK-35151:


 Summary: Flink mysql cdc will  stuck when suspend binlog split and 
ChangeEventQueue is full
 Key: FLINK-35151
 URL: https://issues.apache.org/jira/browse/FLINK-35151
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
 Environment: I use master branch reproduce it.

Reason is that producing binlog is too fast.  
MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
BinlogSplitReader#stopBinlogReadTask to set 

currentTaskRunning to be false after MysqSourceReader receives binlog split 
update event.

MySqlSplitReader#pollSplitRecords is executed and 

dataIt is null to execute closeBinlogReader when currentReader is 
BinlogSplitReader. closeBinlogReader will execute 
statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
Because BinaryLogClient#connectLock is not release  when 
MySqlStreamingChangeEventSource add element to full queue.
Reporter: Xin Gong
 Attachments: dumpstack.txt

Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
full.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35151) Flink mysql cdc will stuck when suspend binlog split and ChangeEventQueue is full

2024-04-17 Thread Xin Gong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xin Gong updated FLINK-35151:
-
Environment: I use master branch reproduce it.  (was: I use master branch 
reproduce it.

Reason is that producing binlog is too fast.  
MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
BinlogSplitReader#stopBinlogReadTask to set 

currentTaskRunning to be false after MysqSourceReader receives binlog split 
update event.

MySqlSplitReader#pollSplitRecords is executed and 

dataIt is null to execute closeBinlogReader when currentReader is 
BinlogSplitReader. closeBinlogReader will execute 
statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
Because BinaryLogClient#connectLock is not release  when 
MySqlStreamingChangeEventSource add element to full queue.)

> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full
> --
>
> Key: FLINK-35151
> URL: https://issues.apache.org/jira/browse/FLINK-35151
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: I use master branch reproduce it.
>Reporter: Xin Gong
>Priority: Major
> Attachments: dumpstack.txt
>
>
> Flink mysql cdc will  stuck when suspend binlog split and ChangeEventQueue is 
> full.
> Reason is that producing binlog is too fast.  
> MySqlSplitReader#suspendBinlogReaderIfNeed will execute 
> BinlogSplitReader#stopBinlogReadTask to set 
> currentTaskRunning to be false after MysqSourceReader receives binlog split 
> update event.
> MySqlSplitReader#pollSplitRecords is executed and 
> dataIt is null to execute closeBinlogReader when currentReader is 
> BinlogSplitReader. closeBinlogReader will execute 
> statefulTaskContext.getBinaryLogClient().disconnect(), it could dead lock. 
> Because BinaryLogClient#connectLock is not release  when 
> MySqlStreamingChangeEventSource add element to full queue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-17 Thread via GitHub


Zakelly commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1569904139


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java:
##
@@ -32,6 +32,8 @@
  */
 public class KeyAccountingUnit {
 
+public static final Object EMPTY_RECORD = "EMPTY_RECORD";

Review Comment:
   IMO, this should be under `RecordContext` instead of `KeyAccountingUnit`. 
And how about using `aec.buildContext(null, key)` everywhere instead of import 
the EMPTY_RECORD everywhere?
   
   Further more, it might be better the `EMPTY_RECORD` is `new Object()` to 
avoid coincidence that happens to be the same as the user input.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import static 
org.apache.flink.runtime.asyncprocessing.KeyAccountingUnit.EMPTY_RECORD;
+
+/**
+ * An implementation of {@link InternalTimerService} that is used by {@link
+ * 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator}.
+ * The timer service will set {@link RecordContext} for the timers before 
invoking action to
+ * preserve the execution order between timer firing and records processing.
+ *
+ * @see https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Timers>FLIP-425
+ * timers section.
+ * @param  Type of timer's key.
+ * @param  Type of the namespace to which timers are scoped.
+ */
+public class InternalTimerServiceAsyncImpl extends 
InternalTimerServiceImpl {
+
+private AsyncExecutionController asyncExecutionController;
+
+InternalTimerServiceAsyncImpl(
+TaskIOMetricGroup taskIOMetricGroup,
+KeyGroupRange localKeyGroupRange,
+KeyContext keyContext,
+ProcessingTimeService processingTimeService,
+KeyGroupedInternalPriorityQueue processingTimeTimersQueue,
+KeyGroupedInternalPriorityQueue eventTimeTimersQueue,

Review Comment:
   How about adding `>` as type parameter here?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -133,6 +140,48 @@ public final  ThrowingConsumer, 
Exception> getRecordProcessor
 getClass().getName(), inputId));
 }
 
+/**
+ * Returns a {@link InternalTimerService} that can be used to query 
current processing time and
+ * event time and to set timers. An operator can have several timer 
services, where each has its
+ * own namespace serializer. Timer services are differentiated by the 
string key that is given
+ * when requesting them, if you call this method with the same key 
multiple times you will get
+ * the same timer service instance in subsequent requests.
+ *
+ * Timers are always scoped to a key, the currently active key of a 
keyed stream operation.
+ * When a timer fires, this key will also be set as the currently active 
key.
+ *
+ * Each timer has attached metadata, the namespace. Different timer 
services can have a
+ * different namespace type. If you don't need namespace differentiation 
you can use {@link
+ * org.apache.flink.runtime.state.VoidNamespaceSerializer} as the 
namespace serializer.
+ *
+ * @param name The name of the requested timer service. If no service 

Re: [PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite… [flink-cdc]

2024-04-17 Thread via GitHub


yuxiqian closed pull request #3235: [hotfix][values] Temporary fix for 
ValuesDataSource stuck in infinite…
URL: https://github.com/apache/flink-cdc/pull/3235


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][values] Temporary fix for ValuesDataSource stuck in infinite… [flink-cdc]

2024-04-17 Thread via GitHub


yuxiqian opened a new pull request, #3235:
URL: https://github.com/apache/flink-cdc/pull/3235

   … loop


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #24672:
URL: https://github.com/apache/flink/pull/24672#discussion_r1569902836


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java:
##
@@ -117,6 +120,7 @@ public class InternalTimerServiceImpl implements 
InternalTimerService {
 startIdx = Math.min(keyGroupIdx, startIdx);
 }
 this.localKeyGroupRangeStartIdx = startIdx;
+this.processingTimeCallback = this::onProcessingTime;

Review Comment:
   How about making `onProcessingTime` a protected method? This way we won't 
need to introduce processingTimeCallback.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java:
##
@@ -179,11 +180,60 @@  InternalTimerServiceImpl 
registerOrGetTimerService(
 return timerService;
 }
 
+@Override
+public  InternalTimerService getAsyncInternalTimerService(
+String name,
+TypeSerializer keySerializer,
+TypeSerializer namespaceSerializer,
+Triggerable triggerable,
+AsyncExecutionController asyncExecutionController) {
+checkNotNull(keySerializer, "Timers can only be used on keyed 
operators.");
+
+// the following casting is to overcome type restrictions.
+TimerSerializer timerSerializer =
+new TimerSerializer<>(keySerializer, namespaceSerializer);
+
+InternalTimerServiceAsyncImpl timerService =
+registerOrGetAsyncTimerService(name, timerSerializer, 
asyncExecutionController);
+
+timerService.startTimerService(
+timerSerializer.getKeySerializer(),
+timerSerializer.getNamespaceSerializer(),
+triggerable);
+
+return timerService;
+}
+
+ InternalTimerServiceAsyncImpl registerOrGetAsyncTimerService(
+String name,
+TimerSerializer timerSerializer,
+AsyncExecutionController asyncExecutionController) {
+InternalTimerServiceAsyncImpl timerService =
+(InternalTimerServiceAsyncImpl) timerServices.get(name);
+if (timerService == null) {
+
+timerService =
+new InternalTimerServiceAsyncImpl<>(
+taskIOMetricGroup,
+localKeyGroupRange,
+keyContext,
+processingTimeService,
+createTimerPriorityQueue(
+PROCESSING_TIMER_PREFIX + name, 
timerSerializer),
+createTimerPriorityQueue(EVENT_TIMER_PREFIX + 
name, timerSerializer),
+cancellationContext,
+asyncExecutionController);
+
+timerServices.put(name, timerService);
+}
+return timerService;
+}
+
 Map> getRegisteredTimerServices() {
 return Collections.unmodifiableMap(timerServices);
 }
 
-private 
+protected 
 KeyGroupedInternalPriorityQueue> 
createTimerPriorityQueue(

Review Comment:
   This method seems not used in subclasses, so `private` might be enough. Same 
for `restoreStateForKeyGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35045][state] Introduce ForStFlinkFileSystem to support reading and writing with ByteBuffer [flink]

2024-04-17 Thread via GitHub


Zakelly commented on code in PR #24632:
URL: https://github.com/apache/flink/pull/24632#discussion_r1569850955


##
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:
##
@@ -140,4 +144,43 @@ public void skipFully(long bytes) throws IOException {
 bytes -= fsDataInputStream.skip(bytes);
 }
 }
+
+@Override
+public int read(ByteBuffer byteBuffer) throws IOException {
+// Not all internal stream supports ByteBufferReadable
+if 
(fsDataInputStream.hasCapability(StreamCapabilities.READBYTEBUFFER)) {
+return fsDataInputStream.read(byteBuffer);
+} else {
+// Fallback to read byte then put
+int c = read();
+if (c == -1) {
+return -1;
+}
+byteBuffer.put((byte) c);
+
+int n = 1, len = byteBuffer.remaining() + 1;
+for (; n < len; n++) {
+c = read();
+if (c == -1) {
+break;
+}
+byteBuffer.put((byte) c);
+}
+return n;
+}
+}
+
+@Override
+public int read(long position, ByteBuffer byteBuffer) throws IOException {
+// Not all internal stream supports ByteBufferPositionedReadable
+if 
(fsDataInputStream.hasCapability(StreamCapabilities.PREADBYTEBUFFER)) {
+return fsDataInputStream.read(position, byteBuffer);
+} else {
+// Fallback to positionable read bytes then put
+byte[] tmp = new byte[byteBuffer.remaining()];

Review Comment:
   Even for this fallback code path, there still be a possible way to optimize 
a little bit. e.g.:
   ```
   if (byteBuffer.hasArray()) {
   fsDataInputStream.readFully(position, byteBuffer.array(), 
byteBuffer.arrayOffset(), byteBuffer.remaining());
   }
   
   ```



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java:
##
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.PositionedReadable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A {@link FSDataInputStream} delegates requests to other one and supports 
reading data with {@link
+ * ByteBuffer}.
+ *
+ * All methods in this class maybe used by ForSt, please start a discussion 
firstly if it has to
+ * be modified.
+ */
+public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {
+
+private final FSDataInputStream originalInputStream;
+
+/**
+ * InputStream Pool which provides multiple input streams to random read 
concurrently. An input
+ * stream should only be used by a thread at a point in time.
+ */
+private final Queue readInputStreamPool;
+
+private final Callable inputStreamBuilder;
+
+public ByteBufferReadableFSDataInputStream(
+FSDataInputStream originalInputStream,
+Callable inputStreamBuilder,
+int inputStreamCapacity) {
+this.originalInputStream = originalInputStream;
+this.inputStreamBuilder = inputStreamBuilder;
+this.readInputStreamPool = new 
LinkedBlockingQueue<>(inputStreamCapacity);
+}
+
+/**
+ * Reads up to ByteBuffer#remaining bytes of data from the 
input stream into a
+ * ByteBuffer. Not Thread-safe yet since the interface of sequential read 
of ForSt only be
+ * accessed by one thread at a time.
+ *
+ * @param bb the buffer into which the data is read.
+ * @return the total number of bytes read into the buffer.
+ * @exception IOException If the first byte cannot be read for any reason 
other than end of
+ * file, or if the input stream has been closed, or if some other I/O 
error occurs.
+ * @exception NullPointerException If bb is null.
+ */
+public int readFully(ByteBuffer bb) throws 

[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Attachment: (was: image-2024-04-18-11-20-43-126.png)

> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> BulkPartWriter:
> !image-2024-04-18-11-03-08-998.png!
> CsvBulkWriter:
> !image-2024-04-18-11-20-43-126.png!
> S3RecoverableFsDataOutputStream:
> !image-2024-04-18-10-51-05-071.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Description: 
Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But I 
don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
here, Calling close later to upload the rest of the stream will inevitably 
result in an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.

 

BulkPartWriter:

!image-2024-04-18-11-03-08-998.png!

CsvBulkWriter:

!image-2024-04-18-11-20-25-583.png!
S3RecoverableFsDataOutputStream:
!image-2024-04-18-10-51-05-071.png!
 

 

 

  was:
Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But I 
don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
here, Calling close later to upload the rest of the stream will inevitably 
result in an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.

 

BulkPartWriter:

!image-2024-04-18-11-03-08-998.png!

CsvBulkWriter:

!image-2024-04-18-11-20-43-126.png!
S3RecoverableFsDataOutputStream:
!image-2024-04-18-10-51-05-071.png!
 

 

 


> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> BulkPartWriter:
> !image-2024-04-18-11-03-08-998.png!
> CsvBulkWriter:
> !image-2024-04-18-11-20-25-583.png!
> S3RecoverableFsDataOutputStream:
> !image-2024-04-18-10-51-05-071.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Description: 
Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But I 
don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
here, Calling close later to upload the rest of the stream will inevitably 
result in an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.

 

BulkPartWriter:

!image-2024-04-18-11-03-08-998.png!

CsvBulkWriter:

!image-2024-04-18-11-20-43-126.png!
S3RecoverableFsDataOutputStream:
!image-2024-04-18-10-51-05-071.png!
 

 

 

  was:
Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But I 
don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
here, Calling close later to upload the rest of the stream will inevitably 
result in an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.

 

!image-2024-04-18-11-07-15-555.png!

!image-2024-04-18-10-51-05-071.png!


> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> BulkPartWriter:
> !image-2024-04-18-11-03-08-998.png!
> CsvBulkWriter:
> !image-2024-04-18-11-20-43-126.png!
> S3RecoverableFsDataOutputStream:
> !image-2024-04-18-10-51-05-071.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Attachment: image-2024-04-18-11-20-43-126.png

> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png, 
> image-2024-04-18-11-20-43-126.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> !image-2024-04-18-11-07-15-555.png!
> !image-2024-04-18-10-51-05-071.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Attachment: image-2024-04-18-11-20-25-583.png

> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-20-25-583.png, 
> image-2024-04-18-11-20-43-126.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> !image-2024-04-18-11-07-15-555.png!
> !image-2024-04-18-10-51-05-071.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Attachment: (was: image-2024-04-18-11-07-15-555.png)

> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> !image-2024-04-18-11-07-15-555.png!
> !image-2024-04-18-10-51-05-071.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

qyw updated FLINK-35150:

Description: 
Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But I 
don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
here, Calling close later to upload the rest of the stream will inevitably 
result in an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.

 

!image-2024-04-18-11-07-15-555.png!

!image-2024-04-18-10-51-05-071.png!

  was:
Flink S3 hadoop, write S3 in csv mode, I used this patch  
[FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513] .   But I don't 
understand why S3RecoverableFsDataOutputStream "sync" method of this class to 
be "completeMultipartUpload" operation, if "completeMultipartUpload" here, 
Calling close later to upload the rest of the stream will inevitably result in 
an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.


> The specified upload does not exist. The upload ID may be invalid
> -
>
> Key: FLINK-35150
> URL: https://issues.apache.org/jira/browse/FLINK-35150
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.0
>Reporter: qyw
>Priority: Major
> Attachments: image-2024-04-18-10-51-05-071.png, 
> image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-07-15-555.png
>
>
> Flink S3 hadoop, write S3 in csv mode, I used this patch  FLINK-28513 .   But 
> I don't understand why S3RecoverableFsDataOutputStream "sync" method of this 
> class to be "completeMultipartUpload" operation, if "completeMultipartUpload" 
> here, Calling close later to upload the rest of the stream will inevitably 
> result in an error.   The part corresponding to uploadID has been merged.
> Therefore, when the message in csv is larger than 
> "S3_MULTIPART_MIN_PART_SIZE", the uploadPart will be started when switching 
> files, then when BulkPartWriter performs closeForCommit, Due to the sync 
> S3RecoverableFsDataOutputStream method call completeMultipartUpload, So 
> S3RecoverableFsDataOutputStream "closeForCommit" method due to the 
> uploadPart, at this time will lead to errors.
>  
> !image-2024-04-18-11-07-15-555.png!
> !image-2024-04-18-10-51-05-071.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35150) The specified upload does not exist. The upload ID may be invalid

2024-04-17 Thread qyw (Jira)
qyw created FLINK-35150:
---

 Summary: The specified upload does not exist. The upload ID may be 
invalid
 Key: FLINK-35150
 URL: https://issues.apache.org/jira/browse/FLINK-35150
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.0
Reporter: qyw
 Attachments: image-2024-04-18-10-51-05-071.png, 
image-2024-04-18-11-03-08-998.png, image-2024-04-18-11-07-15-555.png

Flink S3 hadoop, write S3 in csv mode, I used this patch  
[FLINK-28513|https://issues.apache.org/jira/browse/FLINK-28513] .   But I don't 
understand why S3RecoverableFsDataOutputStream "sync" method of this class to 
be "completeMultipartUpload" operation, if "completeMultipartUpload" here, 
Calling close later to upload the rest of the stream will inevitably result in 
an error.   The part corresponding to uploadID has been merged.
Therefore, when the message in csv is larger than "S3_MULTIPART_MIN_PART_SIZE", 
the uploadPart will be started when switching files, then when BulkPartWriter 
performs closeForCommit, Due to the sync S3RecoverableFsDataOutputStream method 
call completeMultipartUpload, So S3RecoverableFsDataOutputStream 
"closeForCommit" method due to the uploadPart, at this time will lead to errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569864750


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_TIMEOUT =
+ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Good point, changed it to `execution.async-mode.state-buffer-timeout`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to 

Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569863724


##
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean 
useSnapshotCompression) {
 configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, 
useSnapshotCompression);
 }
 
+// 

+//  Asynchronous execution configurations
+// 

+
+@Internal

Review Comment:
   Ah, I misread the comment, and changed them from `@Experimental` to 
`@Internal`, I have corrected them now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]

2024-04-17 Thread via GitHub


morazow commented on code in PR #3230:
URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569863282


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java:
##
@@ -163,10 +163,18 @@ public String toString() {
 // ---
 public static StreamSplit appendFinishedSplitInfos(
 StreamSplit streamSplit, List 
splitInfos) {
+// re-calculate the starting changelog offset after the new table added
+Offset startingOffset = streamSplit.getStartingOffset();
+for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
+if (splitInfo.getHighWatermark().isBefore(startingOffset)) {
+startingOffset = splitInfo.getHighWatermark();
+}
+}

Review Comment:
   Got it, it will be always the min value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]

2024-04-17 Thread via GitHub


morazow commented on code in PR #3230:
URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569850914


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java:
##
@@ -163,10 +163,18 @@ public String toString() {
 // ---
 public static StreamSplit appendFinishedSplitInfos(
 StreamSplit streamSplit, List 
splitInfos) {
+// re-calculate the starting changelog offset after the new table added
+Offset startingOffset = streamSplit.getStartingOffset();
+for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
+if (splitInfo.getHighWatermark().isBefore(startingOffset)) {
+startingOffset = splitInfo.getHighWatermark();
+}
+}

Review Comment:
   Do we have to distinguish the high watermarks before the startingOffset? For 
example, if there are multiple high watermarks before startingOffset, which one 
should we take? Should it be the latest of those?
   
   Or is taking any highWatermark if it is before the startingOffset is 
allright?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34915][table] Complete `DESCRIBE CATALOG` syntax [flink]

2024-04-17 Thread via GitHub


liyubin117 commented on PR #24630:
URL: https://github.com/apache/flink/pull/24630#issuecomment-2062885502

   @LadyForest Hi, Could you please take a review? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35149:
---
Labels: pull-request-available  (was: )

> Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not 
> TwoPhaseCommittingSink
> ---
>
> Key: FLINK-35149
> URL: https://issues.apache.org/jira/browse/FLINK-35149
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Current , when sink is not instanceof TwoPhaseCommittingSink, use 
> input.transform rather than stream. It means that pre-write topology will be 
> ignored.
> {code:java}
> private void sinkTo(
> DataStream input,
> Sink sink,
> String sinkName,
> OperatorID schemaOperatorID) {
> DataStream stream = input;
> // Pre write topology
> if (sink instanceof WithPreWriteTopology) {
> stream = ((WithPreWriteTopology) 
> sink).addPreWriteTopology(stream);
> }
> if (sink instanceof TwoPhaseCommittingSink) {
> addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
> } else {
> input.transform(
> SINK_WRITER_PREFIX + sinkName,
> CommittableMessageTypeInfo.noOutput(),
> new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-04-17 Thread via GitHub


loserwang1024 commented on PR #3233:
URL: https://github.com/apache/flink-cdc/pull/3233#issuecomment-2062883633

   @PatrickRen , CC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink [flink-cdc]

2024-04-17 Thread via GitHub


loserwang1024 opened a new pull request, #3233:
URL: https://github.com/apache/flink-cdc/pull/3233

   Current , when sink is not instanceof TwoPhaseCommittingSink, use 
input.transform rather than stream. It means that pre-write topology will be 
ignored.
   
   ```
   private void sinkTo(
   DataStream input,
   Sink sink,
   String sinkName,
   OperatorID schemaOperatorID) {
   DataStream stream = input;
   // Pre write topology
   if (sink instanceof WithPreWriteTopology) {
   stream = ((WithPreWriteTopology) 
sink).addPreWriteTopology(stream);
   }
   
   if (sink instanceof TwoPhaseCommittingSink) {
   addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
   } else {
   input.transform(
   SINK_WRITER_PREFIX + sinkName,
   CommittableMessageTypeInfo.noOutput(),
   new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
   }
   } 
   ```
   
   (ps: the modify of StarRocksUtils just apply spotless)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34633) Support unnesting array constants

2024-04-17 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-34633.
---
Resolution: Fixed

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34633) Support unnesting array constants

2024-04-17 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-34633.
-

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34633) Support unnesting array constants

2024-04-17 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838429#comment-17838429
 ] 

Jane Chan commented on FLINK-34633:
---

Fixed in master 43a3d50ce3982b9abf04b81407fed46c5c25f819

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34633) Support unnesting array constants

2024-04-17 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-34633:
--
Fix Version/s: 1.19.1

> Support unnesting array constants
> -
>
> Key: FLINK-34633
> URL: https://issues.apache.org/jira/browse/FLINK-34633
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.19.1
>
>
> It seems that the current planner doesn't support using UNNEST on array 
> constants.(x)
> {code:java}
> SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
>  
> The following query can't be compiled.(x)
> {code:java}
> SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
>  
> The rewritten version works. (/)
> {code:java}
> SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
> UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35143) Expose newly added tables capture in mysql pipeline connector

2024-04-17 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838428#comment-17838428
 ] 

Hongshun Wang commented on FLINK-35143:
---

I'd like to do it

> Expose newly added tables capture in mysql pipeline connector
> -
>
> Key: FLINK-35143
> URL: https://issues.apache.org/jira/browse/FLINK-35143
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
>
> Currently, mysql pipeline connector still don't allowed to capture newly 
> added tables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35129) Postgres source commits the offset after every multiple checkpoint cycles.

2024-04-17 Thread Muhammet Orazov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838427#comment-17838427
 ] 

Muhammet Orazov commented on FLINK-35129:
-

Hey [~loserwang1024] , I would be happy to work on it! Yes, could you please 
assign it to me?

> Postgres source commits the offset after every multiple checkpoint cycles.
> --
>
> Key: FLINK-35129
> URL: https://issues.apache.org/jira/browse/FLINK-35129
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Hongshun Wang
>Priority: Major
>
> After entering the Stream phase, the offset consumed by the global slot is 
> committed upon the completion of each checkpoint, preventing log files from 
> being unable to be recycled continuously, which could lead to insufficient 
> disk space.
> However, the job can only restart from the latest checkpoint or savepoint. if 
> restored from an earlier state, WAL may already have been recycled.
>  
> The way to solve it is to commit the offset after every multiple checkpoint 
> cycles. The number of checkpoint cycles is determine by connector option, and 
> the default value is 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34633][table] Support unnesting array constants [flink]

2024-04-17 Thread via GitHub


LadyForest merged PR #24510:
URL: https://github.com/apache/flink/pull/24510


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35149) Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

2024-04-17 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-35149:
-

 Summary: Fix DataSinkTranslator#sinkTo ignoring pre-write topology 
if not TwoPhaseCommittingSink
 Key: FLINK-35149
 URL: https://issues.apache.org/jira/browse/FLINK-35149
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: 3.1.0


Current , when sink is not instanceof TwoPhaseCommittingSink, use 
input.transform rather than stream. It means that pre-write topology will be 
ignored.
{code:java}
private void sinkTo(
DataStream input,
Sink sink,
String sinkName,
OperatorID schemaOperatorID) {
DataStream stream = input;
// Pre write topology
if (sink instanceof WithPreWriteTopology) {
stream = ((WithPreWriteTopology) 
sink).addPreWriteTopology(stream);
}

if (sink instanceof TwoPhaseCommittingSink) {
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
} else {
input.transform(
SINK_WRITER_PREFIX + sinkName,
CommittableMessageTypeInfo.noOutput(),
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
}
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]

2024-04-17 Thread via GitHub


GOODBOY008 commented on PR #3229:
URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2062859948

   @ZmmBigdata @Jiabao-Sun Issue was fixed in pr 
https://github.com/apache/flink-cdc/pull/3217 and merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28693][table] Fix janino compile failed because the code generated refers the class in table-planner [flink]

2024-04-17 Thread via GitHub


xuyangzhong commented on PR #24280:
URL: https://github.com/apache/flink/pull/24280#issuecomment-2062858561

   Thanks very much for the bp @snuyanzin  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35081) CompileException when watermark definition contains coalesce and to_timestamp built-in functions

2024-04-17 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838424#comment-17838424
 ] 

xuyang commented on FLINK-35081:


I think this bug is same with FLINK-28693

> CompileException when watermark definition contains coalesce and to_timestamp 
> built-in functions
> 
>
> Key: FLINK-35081
> URL: https://issues.apache.org/jira/browse/FLINK-35081
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: Grzegorz Kołakowski
>Priority: Major
>
> I have a data stream in which event-time column can have two data formats. To 
> be able to define watermark on the table, I used coalesce and to_timestamp 
> built-in functions as shown below:
> {code:sql}
> create table test (
>     `@timestamp` VARCHAR,
>     __rowtime AS coalesce(
> to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss'),
> to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss.SSS')
> ),
>     watermark for __rowtime as __rowtime - INTERVAL '30' SECOND,
>     ...
> ) with ( ... )
> {code}
> The job failed with the following stacktrace:
> {noformat}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>     at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>     at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>     at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>     at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused 

[jira] [Commented] (FLINK-35144) Support various sources sync for FlinkCDC in one pipeline

2024-04-17 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838422#comment-17838422
 ] 

Hongshun Wang commented on FLINK-35144:
---

You mean that sync multiple sources to same sink?

> Support various sources sync for FlinkCDC in one pipeline
> -
>
> Key: FLINK-35144
> URL: https://issues.apache.org/jira/browse/FLINK-35144
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Congxian Qiu
>Priority: Major
>
> Currently, the FlinkCDC pipeline can only support a single source in one 
> pipeline, we need to start multiple pipelines when there are various sources. 
> For upstream which uses sharding, we need to sync multiple sources in one 
> pipeline, the current pipeline can't do this because it can only support a 
> single source.
> This issue wants to support the sync of multiple sources in one pipeline.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35128][cdc-connector][cdc-base] Re-calculate the starting changelog offset after the new table added [flink-cdc]

2024-04-17 Thread via GitHub


loserwang1024 commented on code in PR #3230:
URL: https://github.com/apache/flink-cdc/pull/3230#discussion_r1569794515


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/StreamSplit.java:
##
@@ -159,7 +159,15 @@ public String toString() {
 // ---
 public static StreamSplit appendFinishedSplitInfos(
 StreamSplit streamSplit, List 
splitInfos) {
+// re-calculate the starting changelog offset after the new table added
+Offset startingOffset = streamSplit.getStartingOffset();
+for (FinishedSnapshotSplitInfo splitInfo : splitInfos) {
+if (splitInfo.getHighWatermark().isBefore(startingOffset)) {
+startingOffset = splitInfo.getHighWatermark();
+}
+}
 splitInfos.addAll(streamSplit.getFinishedSnapshotSplitInfos());
+
 return new StreamSplit(
 streamSplit.splitId,
 streamSplit.getStartingOffset(),

Review Comment:
   done it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [docs]Problem with Case Document Format in Quickstart [flink-cdc]

2024-04-17 Thread via GitHub


Jiabao-Sun commented on PR #3229:
URL: https://github.com/apache/flink-cdc/pull/3229#issuecomment-2062840911

   Thanks @ZmmBigdata for this contribution.
   Could you rebase master and resolve conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,19 +103,26 @@ public class AsyncExecutionController {
 final AtomicInteger inFlightRecordNum;
 
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the 
introduced configurations can pass the configured values into AEC through 
operator setups now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,19 +103,26 @@ public class AsyncExecutionController {
 final AtomicInteger inFlightRecordNum;
 
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the 
introduced configurations can pass the configured values into AEC through 
operator setups now.



##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,73 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+/**
+ * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+ * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+ * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+ * state APIs, the state access is always executed synchronously, enable 
this option would bring
+ * some overhead.
+ *
+ * Note: This is an experimental feature(FLIP-425) under evaluation.
+ */
+@Experimental
+public static final ConfigOption ASYNC_STATE_ENABLED =
+ConfigOptions.key("execution.async-mode.enabled")
+.booleanType()
+.defaultValue(false)
+.withDescription(
+"A flag to enable or disable async mode related 
components when tasks initialize."
++ " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
++ " Otherwise, the state access of Async 
state APIs will be executed synchronously."
++ " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
++ " Note: This is an experimental feature 
under evaluation.");
+
+/**
+ * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+ * the frequency of triggering.
+ */
+@Experimental
+public static final ConfigOption ASYNC_BUFFER_SIZE =
+ConfigOptions.key("execution.async-mode.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
+  

Re: [PR] [FLINK-34549][API] Introduce config, context and processingTimerService for DataStream API V2 [flink]

2024-04-17 Thread via GitHub


reswqa commented on PR #24541:
URL: https://github.com/apache/flink/pull/24541#issuecomment-2062837816

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-34738) "Deployment - YARN" Page for Flink CDC Chinese Documentation

2024-04-17 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun reassigned FLINK-34738:
--

Assignee: Vincent Woo

> "Deployment - YARN" Page for Flink CDC Chinese Documentation
> 
>
> Key: FLINK-34738
> URL: https://issues.apache.org/jira/browse/FLINK-34738
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Assignee: Vincent Woo
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
>
> Translate 
> [https://github.com/apache/flink-cdc/blob/master/docs/content/docs/deployment/yarn.md]
>  into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-3.0][minor][cdc][docs] Optimize markdown styles in quickstart doc [flink-cdc]

2024-04-17 Thread via GitHub


Jiabao-Sun merged PR #3223:
URL: https://github.com/apache/flink-cdc/pull/3223


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][docs] Optimize markdown styles in quickstart doc [flink-cdc]

2024-04-17 Thread via GitHub


Jiabao-Sun merged PR #3217:
URL: https://github.com/apache/flink-cdc/pull/3217


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]

2024-04-17 Thread via GitHub


flinkbot commented on PR #24675:
URL: https://github.com/apache/flink/pull/24675#issuecomment-2062593439

   
   ## CI report:
   
   * fc79b8051c60e2628b2098e82bc945f79b08a3d3 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]

2024-04-17 Thread via GitHub


liuml07 commented on PR #24675:
URL: https://github.com/apache/flink/pull/24675#issuecomment-2062592367

   @snuyanzin could you take a look? Thanks,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35148][core] Improve InstantiationUtil for checking nullary pu… [flink]

2024-04-17 Thread via GitHub


liuml07 opened a new pull request, #24675:
URL: https://github.com/apache/flink/pull/24675

   …blic constructor
   
   
   
   ## What is the purpose of the change
   
   https://issues.apache.org/jira/browse/FLINK-35148
   
   InstantiationUtil#hasPublicNullaryConstructor checks whether the given class 
has a public nullary constructor. The implementation can be improved a bit: the 
`Modifier#isPublic` check within the for-loop can be skipped as the 
Class#getConstructors() only returns public constructors.
   
   We can also add a negative unit test for this.
   
   
   ## Brief change log
   
   - Skip the `Modifier#isPublic` check
   - Use Java stream
   - Add negative unit test
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`InstantiationUtilTest`. This change added a new negative test in that class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35148) Improve InstantiationUtil for checking nullary public constructor

2024-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35148:
---
Labels: pull-request-available  (was: )

> Improve InstantiationUtil for checking nullary public constructor
> -
>
> Key: FLINK-35148
> URL: https://issues.apache.org/jira/browse/FLINK-35148
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Mingliang Liu
>Priority: Minor
>  Labels: pull-request-available
>
> {{InstantiationUtil#hasPublicNullaryConstructor}} checks whether the given 
> class has a public nullary constructor. The implementation can be improved a 
> bit: the `Modifier#isPublic` check within the for-loop can be skipped as the 
> {{Class#getConstructors()}} only returns public constructors.
> We can also add a negative unit test for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35148) Improve InstantiationUtil for checking nullary public constructor

2024-04-17 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-35148:
-

 Summary: Improve InstantiationUtil for checking nullary public 
constructor
 Key: FLINK-35148
 URL: https://issues.apache.org/jira/browse/FLINK-35148
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.18.1, 1.19.0
Reporter: Mingliang Liu


{{InstantiationUtil#hasPublicNullaryConstructor}} checks whether the given 
class has a public nullary constructor. The implementation can be improved a 
bit: the `Modifier#isPublic` check within the for-loop can be skipped as the 
{{Class#getConstructors()}} only returns public constructors.

We can also add a negative unit test for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table

2024-04-17 Thread Sharon Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharon Xie updated FLINK-35147:
---
Description: 
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

1. run the below SQL 
{code:sql}
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
 {code}
 

2. Stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
`id` integer primary key,
– change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
{code}
3. Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png|width=911,height=352!

!image-2024-04-17-14-15-21-647.png|width=1172,height=458!

  was:
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

1. run the below SQL 
{code:sql}
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
 {code}
 

2. Stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
`id` integer primary key,
– change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
{code}
3. Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!


> SinkMaterializer throws StateMigrationException when widening the field type 
> in the output table
> 
>
> Key: FLINK-35147
> URL: https://issues.apache.org/jira/browse/FLINK-35147
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sharon Xie
>Priority: Major
> Attachments: image-2024-04-17-14-15-21-647.png, 
> image-2024-04-17-14-15-35-297.png
>
>
> When a field type in the output table is changed from int -> bigint or 
> timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
> This is unexpected as the change is backward compatible. The new type should 
> be able to "accept" all the old values that had narrower type. 
> Note that the planner works fine and would accept such change. 
> To reproduce
> 1. run the below SQL 
> {code:sql}
> CREATE TABLE ltable (
> `id` integer 

[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table

2024-04-17 Thread Sharon Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharon Xie updated FLINK-35147:
---
Description: 
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

1. run the below SQL 
{code:sql}
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
 {code}
 

2. Stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
`id` integer primary key,
– change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
{code}
3. Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!

  was:
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

 
{code:sql}
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
 {code}
 

 

 

Run it, stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
`id` integer primary key,
– change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
{code}

Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!


> SinkMaterializer throws StateMigrationException when widening the field type 
> in the output table
> 
>
> Key: FLINK-35147
> URL: https://issues.apache.org/jira/browse/FLINK-35147
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sharon Xie
>Priority: Major
> Attachments: image-2024-04-17-14-15-21-647.png, 
> image-2024-04-17-14-15-35-297.png
>
>
> When a field type in the output table is changed from int -> bigint or 
> timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
> This is unexpected as the change is backward compatible. The new type should 
> be able to "accept" all the old values that had narrower type. 
> Note that the planner works fine and would accept such change. 
> To reproduce
> 1. run the below SQL 
> {code:sql}
> CREATE TABLE ltable (
> `id` integer primary key,
> `num` int
> ) WITH (
> 

[jira] [Updated] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table

2024-04-17 Thread Sharon Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharon Xie updated FLINK-35147:
---
Description: 
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

 
{code:sql}
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
 {code}
 

 

 

Run it, stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
`id` integer primary key,
– change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
{code}

Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!

  was:
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 



To reproduce

```
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
```

Run it, stop with a savepoint, then update output table with 

```
CREATE TABLE output (
`id` integer primary key,
   -- change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
```
Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!


> SinkMaterializer throws StateMigrationException when widening the field type 
> in the output table
> 
>
> Key: FLINK-35147
> URL: https://issues.apache.org/jira/browse/FLINK-35147
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Sharon Xie
>Priority: Major
> Attachments: image-2024-04-17-14-15-21-647.png, 
> image-2024-04-17-14-15-35-297.png
>
>
> When a field type in the output table is changed from int -> bigint or 
> timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
> This is unexpected as the change is backward compatible. The new type should 
> be able to "accept" all the old values that had narrower type. 
> Note that the planner works fine and would accept such change. 
> To reproduce
>  
> {code:sql}
> CREATE TABLE ltable (
> `id` integer primary key,
> `num` int
> ) WITH (
> 'connector' = 'upsert-kafka',
> 'properties.bootstrap.servers' = 

[jira] [Created] (FLINK-35147) SinkMaterializer throws StateMigrationException when widening the field type in the output table

2024-04-17 Thread Sharon Xie (Jira)
Sharon Xie created FLINK-35147:
--

 Summary: SinkMaterializer throws StateMigrationException when 
widening the field type in the output table
 Key: FLINK-35147
 URL: https://issues.apache.org/jira/browse/FLINK-35147
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Sharon Xie
 Attachments: image-2024-04-17-14-15-21-647.png, 
image-2024-04-17-14-15-35-297.png

When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 



To reproduce

```
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);

CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);

CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);

insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
```

Run it, stop with a savepoint, then update output table with 

```
CREATE TABLE output (
`id` integer primary key,
   -- change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
```
Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31860] FlinkDeployments never finalize when namespace is deleted [flink-kubernetes-operator]

2024-04-17 Thread via GitHub


jiangzho commented on PR #817:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/817#issuecomment-2062344434

   Thanks @gyfora for quick turnaround !
   
   The commit has been updated with style fix. This patch is validated via 
minikube setup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs

2024-04-17 Thread Xingcan Cui (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838379#comment-17838379
 ] 

Xingcan Cui commented on FLINK-34583:
-

Hi [~xuyangzhong], thanks for looking into this. I hit the issue when using the 
Paimon table source. The execution plan looks good. However, the options don't 
work. It could be a runtime issue or Paimon source implementation bug. I can't 
remember clearly if Flink generates multiple table sources and then merges them 
at runtime. If it does, the options may not be merged properly.

!image-2024-04-17-16-48-49-073.png!

 

> Bug for dynamic table option hints with multiple CTEs
> -
>
> Key: FLINK-34583
> URL: https://issues.apache.org/jira/browse/FLINK-34583
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
> Attachments: image-2024-04-17-16-35-06-153.png, 
> image-2024-04-17-16-48-49-073.png
>
>
> The table options hints don't work well with multiple WITH clauses referring 
> to the same table. Please see the following example.
>  
> The following query with hints works well.
> {code:java}
> SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code}
> The following query with multiple WITH clauses also works well.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...)
> SELECT * FROM T3;{code}
> The following query with multiple WITH clauses referring to the same original 
> table failed to recognize the hints.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...),
> T4 AS (SELECT ... FROM T2 WHERE...),
> T5 AS (SELECT ... FROM T3 JOIN T4 ON...)
> SELECT * FROM T5;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs

2024-04-17 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34583:

Attachment: image-2024-04-17-16-48-49-073.png

> Bug for dynamic table option hints with multiple CTEs
> -
>
> Key: FLINK-34583
> URL: https://issues.apache.org/jira/browse/FLINK-34583
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
> Attachments: image-2024-04-17-16-35-06-153.png, 
> image-2024-04-17-16-48-49-073.png
>
>
> The table options hints don't work well with multiple WITH clauses referring 
> to the same table. Please see the following example.
>  
> The following query with hints works well.
> {code:java}
> SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code}
> The following query with multiple WITH clauses also works well.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...)
> SELECT * FROM T3;{code}
> The following query with multiple WITH clauses referring to the same original 
> table failed to recognize the hints.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...),
> T4 AS (SELECT ... FROM T2 WHERE...),
> T5 AS (SELECT ... FROM T3 JOIN T4 ON...)
> SELECT * FROM T5;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs

2024-04-17 Thread Xingcan Cui (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-34583:

Attachment: image-2024-04-17-16-35-06-153.png

> Bug for dynamic table option hints with multiple CTEs
> -
>
> Key: FLINK-34583
> URL: https://issues.apache.org/jira/browse/FLINK-34583
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
> Attachments: image-2024-04-17-16-35-06-153.png
>
>
> The table options hints don't work well with multiple WITH clauses referring 
> to the same table. Please see the following example.
>  
> The following query with hints works well.
> {code:java}
> SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code}
> The following query with multiple WITH clauses also works well.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...)
> SELECT * FROM T3;{code}
> The following query with multiple WITH clauses referring to the same original 
> table failed to recognize the hints.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...),
> T4 AS (SELECT ... FROM T2 WHERE...),
> T5 AS (SELECT ... FROM T3 JOIN T4 ON...)
> SELECT * FROM T5;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358
 ] 

Aleksandr Pilipenko commented on FLINK-35115:
-

snapshotState method in FlinkKinesisConsumer skip saving state if operator had 
been cancelled:
2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.
This leads to state not being updated in state backend during 
stop-with-savepoint workflow. Created a PR to resolve this.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Aleksandr Pilipenko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838358#comment-17838358
 ] 

Aleksandr Pilipenko edited comment on FLINK-35115 at 4/17/24 7:01 PM:
--

snapshotState method in FlinkKinesisConsumer skip saving state if operator had 
been cancelled:
{noformat}
2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.{noformat}
This leads to state not being updated in state backend during 
stop-with-savepoint workflow. Created a PR to resolve this.


was (Author: a.pilipenko):
snapshotState method in FlinkKinesisConsumer skip saving state if operator had 
been cancelled:
2024-04-17 14:05:52,645 DEBUG 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - 
snapshotState() called on closed source; returning null.
This leads to state not being updated in state backend during 
stop-with-savepoint workflow. Created a PR to resolve this.

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-31860] FlinkDeployments never finalize when namespace is deleted [flink-kubernetes-operator]

2024-04-17 Thread via GitHub


jiangzho opened a new pull request, #817:
URL: https://github.com/apache/flink-kubernetes-operator/pull/817

   
   
   ## What is the purpose of the change
   
   This patch is to tackle the corner case where `cleanup` of CustomResource is 
constantly failing due to event publish failure, and therefore cause the 
`cleanup` hanging forever.
   
   Operator attempts to publish events when reconcile changes or cleaning up 
for a CustomResource. This patch allows the reconcile & clean up logic to 
proceed, if and only if the event publishing fails as a result of 403 
forbidden, which happens when the namespace is being deleted (in terminating 
state). k8s rejects events / resource creation in a terminating namespace.
   
   In this way, at-least-once events delivery is still guaranteed in other 
cases. Event consumers need to account for the "namespace deleted" scenario, 
which all resources within are implicitly deleted.
   
   ## Brief change log
   
 - Fix event publish blocking reconcile & cleanup upon 403
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - Added additional scenario in EventUtilsTest and expect it to succeed
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32622][table-planner] Optimize mini-batch assignment [flink]

2024-04-17 Thread via GitHub


jeyhunkarimov commented on PR #23470:
URL: https://github.com/apache/flink/pull/23470#issuecomment-2061766848

   Thanks a lot @xuyangzhong for your review. I addressed your comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35115] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

2024-04-17 Thread via GitHub


z3d1k commented on PR #138:
URL: 
https://github.com/apache/flink-connector-aws/pull/138#issuecomment-2061752988

   @dannycranmer, @hlteoh37 please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35115] Allow kinesis consumer to snapshotState after operator had been cancelled [flink-connector-aws]

2024-04-17 Thread via GitHub


z3d1k opened a new pull request, #138:
URL: https://github.com/apache/flink-connector-aws/pull/138

   ## Purpose of the change
   
   Perform snapshot state even after source operator had been cancelled.
   This solves the issue when operator state is not being saved during 
stor-with-savepoint workflow.
   
   ## Verifying this change
   
   - *Added unit tests*
   - *Manually verified by running the Kinesis connector on a local Flink 
cluster.*
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35115:
---
Labels: kinesis pull-request-available  (was: kinesis)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35116] Bump operator sdk version to 4.8.3 [flink-kubernetes-operator]

2024-04-17 Thread via GitHub


mbalassi merged PR #816:
URL: https://github.com/apache/flink-kubernetes-operator/pull/816


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-17 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838277#comment-17838277
 ] 

Etienne Chauchot commented on FLINK-35124:
--

It was failing on cassandra before the _utils.sh change, at the time I did a 
quick workaround by copying the suppression.xml file to /tools

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Priority: Major
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-17 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838277#comment-17838277
 ] 

Etienne Chauchot edited comment on FLINK-35124 at 4/17/24 4:21 PM:
---

It was failing on cassandra before the _utils.sh change, at the time I did a 
quick workaround by copying the suppression.xml file to /tools.

I think it was failing also for the other connectors.


was (Author: echauchot):
It was failing on cassandra before the _utils.sh change, at the time I did a 
quick workaround by copying the suppression.xml file to /tools

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Priority: Major
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838238#comment-17838238
 ] 

Ryan Skraba commented on FLINK-35041:
-

1.20 test_ci_core 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58969=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8933
1.20 test_ci_core 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58971=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=8897

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35146) CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan

2024-04-17 Thread Ryan Skraba (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Skraba updated FLINK-35146:

Labels: test-stability  (was: )

> CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan
> ---
>
> Key: FLINK-35146
> URL: https://issues.apache.org/jira/browse/FLINK-35146
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.19.1
>Reporter: Ryan Skraba
>Priority: Critical
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58960=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=16690
> {code}
> Apr 17 06:27:47 06:27:47.363 [ERROR] Tests run: 2, Failures: 1, Errors: 0, 
> Skipped: 1, Time elapsed: 64.51 s <<< FAILURE! -- in 
> org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase
> Apr 17 06:27:47 06:27:47.364 [ERROR] 
> org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan[executionMode]
>  -- Time elapsed: 56.55 s <<< FAILURE!
> Apr 17 06:27:47 org.opentest4j.AssertionFailedError: Did not get expected 
> results before timeout, actual result: null. ==> expected:  but was: 
> 
> Apr 17 06:27:47   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> Apr 17 06:27:47   at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> Apr 17 06:27:47   at 
> org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> Apr 17 06:27:47   at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> Apr 17 06:27:47   at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> Apr 17 06:27:47   at 
> org.apache.flink.table.sql.SqlITCaseBase.checkResultFile(SqlITCaseBase.java:216)
> Apr 17 06:27:47   at 
> org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:149)
> Apr 17 06:27:47   at 
> org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:133)
> Apr 17 06:27:47   at 
> org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan(CompileAndExecuteRemotePlanITCase.java:70)
> Apr 17 06:27:47   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 17 06:27:47   at 
> org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
> Apr 17 06:27:47   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Apr 17 06:27:47 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35146) CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan

2024-04-17 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-35146:
---

 Summary: 
CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan
 Key: FLINK-35146
 URL: https://issues.apache.org/jira/browse/FLINK-35146
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.19.1
Reporter: Ryan Skraba


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58960=logs=fb37c667-81b7-5c22-dd91-846535e99a97=011e961e-597c-5c96-04fe-7941c8b83f23=16690

{code}
Apr 17 06:27:47 06:27:47.363 [ERROR] Tests run: 2, Failures: 1, Errors: 0, 
Skipped: 1, Time elapsed: 64.51 s <<< FAILURE! -- in 
org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase
Apr 17 06:27:47 06:27:47.364 [ERROR] 
org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan[executionMode]
 -- Time elapsed: 56.55 s <<< FAILURE!
Apr 17 06:27:47 org.opentest4j.AssertionFailedError: Did not get expected 
results before timeout, actual result: null. ==> expected:  but was: 

Apr 17 06:27:47 at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
Apr 17 06:27:47 at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
Apr 17 06:27:47 at 
org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
Apr 17 06:27:47 at 
org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
Apr 17 06:27:47 at 
org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
Apr 17 06:27:47 at 
org.apache.flink.table.sql.SqlITCaseBase.checkResultFile(SqlITCaseBase.java:216)
Apr 17 06:27:47 at 
org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:149)
Apr 17 06:27:47 at 
org.apache.flink.table.sql.SqlITCaseBase.runAndCheckSQL(SqlITCaseBase.java:133)
Apr 17 06:27:47 at 
org.apache.flink.table.sql.CompileAndExecuteRemotePlanITCase.testCompileAndExecutePlan(CompileAndExecuteRemotePlanITCase.java:70)
Apr 17 06:27:47 at java.lang.reflect.Method.invoke(Method.java:498)
Apr 17 06:27:47 at 
org.apache.flink.util.ExternalResource$1.evaluate(ExternalResource.java:48)
Apr 17 06:27:47 at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
Apr 17 06:27:47 

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33186) CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished fails on AZP

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838235#comment-17838235
 ] 

Ryan Skraba commented on FLINK-33186:
-

1.20 test_cron_hadoop313 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58958=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8314
1.20 Java 8: Test (module: tests) 
https://github.com/apache/flink/actions/runs/8719280474/job/23918749100#step:10:8028


>  CheckpointAfterAllTasksFinishedITCase.testRestoreAfterSomeTasksFinished 
> fails on AZP
> -
>
> Key: FLINK-33186
> URL: https://issues.apache.org/jira/browse/FLINK-33186
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Sergey Nuyanzin
>Assignee: Jiang Xin
>Priority: Critical
>  Labels: test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=53509=logs=baf26b34-3c6a-54e8-f93f-cf269b32f802=8c9d126d-57d2-5a9e-a8c8-ff53f7b35cd9=8762
> fails as
> {noformat}
> Sep 28 01:23:43 Caused by: 
> org.apache.flink.runtime.checkpoint.CheckpointException: Task local 
> checkpoint failure.
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2248)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2235)
> Sep 28 01:23:43   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$null$9(CheckpointCoordinator.java:817)
> Sep 28 01:23:43   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> Sep 28 01:23:43   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 28 01:23:43   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> Sep 28 01:23:43   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> Sep 28 01:23:43   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> Sep 28 01:23:43   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> Sep 28 01:23:43   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31421) DataGeneratorSourceITCase.testGatedRateLimiter failed on CI

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838234#comment-17838234
 ] 

Ryan Skraba commented on FLINK-31421:
-

1.19 Hadoop 3.1.3: Test (module: misc) 
https://github.com/apache/flink/actions/runs/8715237951/job/23907182643#step:10:23996

> DataGeneratorSourceITCase.testGatedRateLimiter failed on CI
> ---
>
> Key: FLINK-31421
> URL: https://issues.apache.org/jira/browse/FLINK-31421
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.18.0
>Reporter: Qingsheng Ren
>Priority: Major
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47044=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=23109]
> {code:java}
> [ERROR] 
> org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter
>   Time elapsed: 1.557 s  <<< FAILURE!
> Mar 11 04:43:12 java.lang.AssertionError: 
> Mar 11 04:43:12 
> Mar 11 04:43:12 Expected size: 8 but was: 7 in:
> Mar 11 04:43:12 [1L, 1L, 1L, 1L, 1L, 1L, 1L]
> Mar 11 04:43:12   at 
> org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter(DataGeneratorSourceITCase.java:200)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34644) RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing failed with ConnectionClosedException

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838232#comment-17838232
 ] 

Ryan Skraba commented on FLINK-34644:
-

1.20 Java 21: Test (module: core) 
https://github.com/apache/flink/actions/runs/8715237422/job/23907067308#step:10:9495

> RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing failed with 
> ConnectionClosedException
> ---
>
> Key: FLINK-34644
> URL: https://issues.apache.org/jira/browse/FLINK-34644
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.20.0
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> https://github.com/apache/flink/actions/runs/8189958608/job/22396362238#step:10:9215
> {code}
> Error: 15:13:33 15:13:33.779 [ERROR] Tests run: 68, Failures: 0, Errors: 1, 
> Skipped: 4, Time elapsed: 17.81 s <<< FAILURE! -- in 
> org.apache.flink.runtime.rest.RestServerEndpointITCase
> Error: 15:13:33 15:13:33.779 [ERROR] 
> org.apache.flink.runtime.rest.RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing
>  -- Time elapsed: 0.329 s <<< ERROR!
> Mar 07 15:13:33 java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.rest.ConnectionClosedException: Channel became 
> inactive.
> Mar 07 15:13:33   at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Mar 07 15:13:33   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
> Mar 07 15:13:33   at 
> org.apache.flink.runtime.rest.RestServerEndpointITCase.testShouldWaitForHandlersWhenClosing(RestServerEndpointITCase.java:592)
> Mar 07 15:13:33   at java.lang.reflect.Method.invoke(Method.java:498)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> Mar 07 15:13:33   at 
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
> Mar 07 15:13:33   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
> Mar 07 15:13:33   at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
> Mar 07 15:13:33   at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> Mar 07 15:13:33   at 
> java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
> Mar 07 15:13:33   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Mar 07 15:13:33   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Mar 07 15:13:33   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Mar 07 15:13:33   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Mar 07 15:13:33   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Mar 07 15:13:33 Caused by: 
> org.apache.flink.runtime.rest.ConnectionClosedException: Channel became 
> inactive.
> Mar 07 15:13:33   at 
> 

[jira] [Commented] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838233#comment-17838233
 ] 

Ryan Skraba commented on FLINK-35041:
-

1.20 Java 8: Test (module: core) 
https://github.com/apache/flink/actions/runs/8715237422/job/23907053858#step:10:9074

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34646) AggregateITCase.testDistinctWithRetract timed out

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838231#comment-17838231
 ] 

Ryan Skraba commented on FLINK-34646:
-

1.18 AdaptiveScheduler: Test (module: table) 
https://github.com/apache/flink/actions/runs/8715237382/job/23907050959#step:10:12476

This is the exact same timeout but it's happening on 
{{AggregateITCase.testMinMaxWithBinaryString}}

> AggregateITCase.testDistinctWithRetract timed out
> -
>
> Key: FLINK-34646
> URL: https://issues.apache.org/jira/browse/FLINK-34646
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> https://github.com/apache/flink/actions/runs/8211401561/job/22460442229#step:10:17161
> {code}
> "main" #1 prio=5 os_prio=0 tid=0x7f70abeb7000 nid=0x4cff3 waiting on 
> condition [0x7f70ac3f6000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xcd24c690> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase.testDistinctWithRetract(AggregateITCase.scala:345)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35115:
--
Affects Version/s: aws-connector-4.2.0
   aws-connector-4.1.0
   aws-connector-4.0.0

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35115:
--
Fix Version/s: aws-connector-4.3.0
   (was: aws-connector-4.2.0)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35115:
--
Fix Version/s: aws-connector-4.2.0

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis
> Fix For: aws-connector-4.2.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35115:
--
Affects Version/s: 1.17.2
   1.15.4

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.15.4, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint

2024-04-17 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35115:
--
Priority: Blocker  (was: Major)

> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* 
> exactly-once setup with:
>  * Flink versions checked 1.16.3 and 1.18.1
>  * Kinesis connector checked 1.16.3 and 4.2.0-1.18
>  * checkpointing configured at 1 minute with EXACTLY_ONCE mode: 
> {code:java}
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment (); 
> execEnv.enableCheckpointing (6,EXACTLY_ONCE); execEnv.getCheckpointConfig 
> ().setCheckpointTimeout (9); execEnv.getCheckpointConfig 
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
>  * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 48);
> KafkaSink sink = KafkaSink.builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema) (element, context, 
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
>  * Kinesis consumer defined as: 
> {code:java}
> FlinkKinesisConsumer flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>  
>Reporter: Vadim Vararu
>Assignee: Aleksandr Pilipenko
>Priority: Blocker
>  Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a 
> stop-with-savepoint, Flink duplicates in Kafka all the records between the 
> last checkpoint and the savepoint at resume:
>  * Event1 is written to Kinesis
>  * Event1 is processed by Flink 
>  * Event1 is committed to Kafka at the checkpoint
>  * 
> 
>  * Event2 is written to Kinesis
>  * Event2 is processed by Flink
>  * Stop with savepoint is triggered manually
>  * Event2 is committed to Kafka
>  * 
> 
>  * Job is resumed from the savepoint
>  * *{color:#FF}Event2 is written again to Kafka at the first 
> checkpoint{color}*
>  
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 
> reasons:{color}
>  * I've checked the actual Kinesis sequence number in the _metadata file 
> generated at stop-with-savepoint and it's the one from the checkpoint before 
> the savepoint  instead of being the one of the last record committed to Kafka.
>  * I've tested exactly the save job with Kafka as source instead of Kinesis 
> as source and the behaviour does not reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35124) Connector Release Fails to run Checkstyle

2024-04-17 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838224#comment-17838224
 ] 

Danny Cranmer commented on FLINK-35124:
---

Actually I still think this is the issue, since it runs the maven commands on 
the "pristine source", which is missing the maven config directory: 
https://github.com/apache/flink-connector-shared-utils/blob/release_utils/stage_jars.sh#L54.
 

 

If it were an invalid path, it would fail on all builds, not just during the 
release. 

> Connector Release Fails to run Checkstyle
> -
>
> Key: FLINK-35124
> URL: https://issues.apache.org/jira/browse/FLINK-35124
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Danny Cranmer
>Priority: Major
>
> During a release of the AWS connectors the build was failing at the 
> \{{./tools/releasing/shared/stage_jars.sh}} step due to a checkstyle error.
>  
> {code:java}
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-checkstyle-plugin:3.1.2:check (validate) on 
> project flink-connector-aws: Failed during checkstyle execution: Unable to 
> find suppressions file at location: /tools/maven/suppressions.xml: Could not 
> find resource '/tools/maven/suppressions.xml'. -> [Help 1] {code}
>  
> Looks like it is caused by this 
> [https://github.com/apache/flink-connector-shared-utils/commit/a75b89ee3f8c9a03e97ead2d0bd9d5b7bb02b51a]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35125) Implement ValueState for ForStStateBackend

2024-04-17 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu reassigned FLINK-35125:


Assignee: Jinzhong Li

> Implement ValueState for ForStStateBackend
> --
>
> Key: FLINK-35125
> URL: https://issues.apache.org/jira/browse/FLINK-35125
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Jinzhong Li
>Assignee: Jinzhong Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34987) Introduce Internal State Interface for Async State API

2024-04-17 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34987.
--
Fix Version/s: 1.20.0
   Resolution: Fixed

merged 7699a0d2...2c5078bc into master

> Introduce Internal State Interface for Async State API
> --
>
> Key: FLINK-34987
> URL: https://issues.apache.org/jira/browse/FLINK-34987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Hangxiang Yu
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33394) DataGeneratorSourceITCase.testGatedRateLimiter fails on AZP

2024-04-17 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838223#comment-17838223
 ] 

Ryan Skraba commented on FLINK-33394:
-

This should be closed as a duplicate of FLINK-31421 (Weirdly enough, I can't 
link two Jira ... unless another link already exists!)

> DataGeneratorSourceITCase.testGatedRateLimiter fails on AZP
> ---
>
> Key: FLINK-33394
> URL: https://issues.apache.org/jira/browse/FLINK-33394
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.17.2
>Reporter: Sergey Nuyanzin
>Priority: Major
>  Labels: test-stability
>
> This build 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=54054=logs=5cae8624-c7eb-5c51-92d3-4d2dacedd221=5acec1b4-945b-59ca-34f8-168928ce5199=22927
> fails on AZP as
> {noformat}
> Oct 26 07:37:41 [1L, 1L, 1L, 1L, 1L, 1L]
> Oct 26 07:37:41   at 
> org.apache.flink.connector.datagen.source.DataGeneratorSourceITCase.testGatedRateLimiter(DataGeneratorSourceITCase.java:200)
> Oct 26 07:37:41   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Oct 26 07:37:41   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Oct 26 07:37:41   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Oct 26 07:37:41   at java.lang.reflect.Method.invoke(Method.java:498)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34987][state] Introduce Internal State for Async State API [flink]

2024-04-17 Thread via GitHub


masteryhx closed pull request #24651: [FLINK-34987][state] Introduce Internal 
State for Async State API
URL: https://github.com/apache/flink/pull/24651


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >