[jira] [Created] (FLINK-35031) Event timer firing under async execution model

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35031:
--

 Summary: Event timer firing under async execution model
 Key: FLINK-35031
 URL: https://issues.apache.org/jira/browse/FLINK-35031
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35030) Introduce Epoch Manager for watermark under async execution

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35030:
--

 Summary: Introduce Epoch Manager for watermark under async 
execution
 Key: FLINK-35030
 URL: https://issues.apache.org/jira/browse/FLINK-35030
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35026) Introduce async execution configurations

2024-04-06 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-35026:
---
Summary: Introduce async execution configurations  (was: Introduce 
async-state configurations)

> Introduce async execution configurations
> 
>
> Key: FLINK-35026
> URL: https://issues.apache.org/jira/browse/FLINK-35026
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35029) Store timer in JVM heap when async execution enabled

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35029:
--

 Summary: Store timer in JVM heap when async execution enabled
 Key: FLINK-35029
 URL: https://issues.apache.org/jira/browse/FLINK-35029
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-24939:
-
Summary: Support 'SHOW CREATE CATALOG' syntax  (was: Complete 'SHOW CREATE 
CATALOG' syntax)

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34915) Complete `DESCRIBE CATALOG` syntax

2024-04-06 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-34915:
-
Summary: Complete `DESCRIBE CATALOG` syntax  (was: Support `DESCRIBE 
CATALOG` syntax)

> Complete `DESCRIBE CATALOG` syntax
> --
>
> Key: FLINK-34915
> URL: https://issues.apache.org/jira/browse/FLINK-34915
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
> Attachments: image-2024-03-22-18-29-00-454.png
>
>
> Describe the metadata of an existing catalog. The metadata information 
> includes the catalog’s name, type, and comment. If the optional {{EXTENDED}} 
> option is specified, catalog properties are also returned.
> NOTICE: The parser part of this syntax has been implemented in FLIP-69 , and 
> it is not actually available. we can complete the syntax in this FLIP. 
> !image-2024-03-22-18-29-00-454.png|width=561,height=374!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24939) Complete 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Yubin Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yubin Li updated FLINK-24939:
-
Summary: Complete 'SHOW CREATE CATALOG' syntax  (was: Support 'SHOW CREATE 
CATALOG' syntax)

> Complete 'SHOW CREATE CATALOG' syntax
> -
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35006] Use try with-resource for StandaloneAutoscalerExecutor [flink-kubernetes-operator]

2024-04-06 Thread via GitHub


1996fanrui commented on code in PR #811:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/811#discussion_r1554828200


##
flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java:
##
@@ -60,9 +60,11 @@ public static > void main(String[
 
 var autoScaler = createJobAutoscaler(eventHandler, stateStore);
 
-var autoscalerExecutor =
-new StandaloneAutoscalerExecutor<>(conf, jobListFetcher, 
eventHandler, autoScaler);
-autoscalerExecutor.start();
+try (var autoscalerExecutor =
+new StandaloneAutoscalerExecutor<>(
+conf, jobListFetcher, eventHandler, autoScaler)) {
+autoscalerExecutor.start();
+}

Review Comment:
   Hey @plugatarev , thanks for the PR!
   
   IIUC, we cannot close `StandaloneAutoscalerExecutor`. We expect 
StandaloneAutoscalerExecutor scaling jobs periodically. It will stop work if we 
close executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-06 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834609#comment-17834609
 ] 

xiaogang zhou edited comment on FLINK-32070 at 4/7/24 6:36 AM:
---

Is there any Branch I can compile to do a POC? And I think if you are busy on 
flink 2.0 state, I can also help do some work on this FLIP-306?[~Zakelly] 


was (Author: zhoujira86):
Is there any Branch I can view do a POC? And I think if you are busy on flink 
2.0 state, I can also help do some work on this issue?[~Zakelly] 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35028) Processing timer firing under async execution model

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35028:
--

 Summary: Processing timer firing under async execution model
 Key: FLINK-35028
 URL: https://issues.apache.org/jira/browse/FLINK-35028
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends, Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35010) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector

2024-04-06 Thread Zhongqiang Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834610#comment-17834610
 ] 

Zhongqiang Gong commented on FLINK-35010:
-

+1 , It's better to bump commons-compress version to 1.26.1.

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink 
> Mongodb connector
> --
>
> Key: FLINK-35010
> URL: https://issues.apache.org/jira/browse/FLINK-35010
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-06 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834609#comment-17834609
 ] 

xiaogang zhou commented on FLINK-32070:
---

Is there any Branch I can view do a POC? And I think if you are busy on flink 
2.0 state, I can also help do some work on this issue?[~Zakelly] 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35006) Use try with-resource for StandaloneAutoscalerExecutor

2024-04-06 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-35006:
---

Assignee: Kirill Plugatarev

> Use try with-resource for StandaloneAutoscalerExecutor
> --
>
> Key: FLINK-35006
> URL: https://issues.apache.org/jira/browse/FLINK-35006
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Kirill Plugatarev
>Assignee: Kirill Plugatarev
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-06 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-35027:
---
Component/s: Runtime / Checkpointing

> Implement checkpoint drain in AsyncExecutionController
> --
>
> Key: FLINK-35027
> URL: https://issues.apache.org/jira/browse/FLINK-35027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35025) Wire AsyncExecutionController to AbstractStreamOperator

2024-04-06 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-35025:
---
Component/s: Runtime / Task

> Wire AsyncExecutionController to AbstractStreamOperator
> ---
>
> Key: FLINK-35025
> URL: https://issues.apache.org/jira/browse/FLINK-35025
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35026) Introduce async-state configurations

2024-04-06 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei updated FLINK-35026:
---
Component/s: Runtime / Configuration
 Runtime / Task

> Introduce async-state configurations
> 
>
> Key: FLINK-35026
> URL: https://issues.apache.org/jira/browse/FLINK-35026
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration, Runtime / Task
>Reporter: Yanfei Lei
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35027:
--

 Summary: Implement checkpoint drain in AsyncExecutionController
 Key: FLINK-35027
 URL: https://issues.apache.org/jira/browse/FLINK-35027
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34908][pipeline-connector][doris] Fix Mysql pipeline to doris will lost precision for timestamp [flink-cdc]

2024-04-06 Thread via GitHub


gong commented on PR #3207:
URL: https://github.com/apache/flink-cdc/pull/3207#issuecomment-2041325325

   @leonardBang PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35010) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector

2024-04-06 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun reassigned FLINK-35010:
--

Assignee: Zhongqiang Gong

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink 
> Mongodb connector
> --
>
> Key: FLINK-35010
> URL: https://issues.apache.org/jira/browse/FLINK-35010
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Assignee: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35010) Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector

2024-04-06 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834607#comment-17834607
 ] 

Jiabao Sun commented on FLINK-35010:


I think we should bump commons-compress version to 1.26.1 due to 
https://issues.apache.org/jira/browse/COMPRESS-659.

> Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink 
> Mongodb connector
> --
>
> Key: FLINK-35010
> URL: https://issues.apache.org/jira/browse/FLINK-35010
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / MongoDB
>Reporter: Zhongqiang Gong
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [minor][cdc][docs] Optimize styles of the Flink CDC website docs home… [flink-cdc]

2024-04-06 Thread via GitHub


leonardBang commented on PR #3208:
URL: https://github.com/apache/flink-cdc/pull/3208#issuecomment-2041324990

   @GOODBOY008 Would you like to help review this PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink Kafka connector

2024-04-06 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834606#comment-17834606
 ] 

Jiabao Sun commented on FLINK-35008:


Due to an incorrect dependency on the Charsets class in the commons-codec 
package in TarArchiveOutputStream, it is necessary to include the commons-codec 
dependency to avoid compilation errors. 
This issue has been fixed in version 1.26.1 of commons-compress.

https://github.com/GOODBOY008/flink-connector-mongodb/actions/runs/8557577952/job/23450146047#step:15:11104
{code}
Caused by: java.lang.RuntimeException: Failed to build JobManager image
at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configureJobManagerContainer(FlinkTestcontainersConfigurator.java:67)
at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configure(FlinkTestcontainersConfigurator.java:147)
at 
org.apache.flink.connector.testframe.container.FlinkContainers$Builder.build(FlinkContainers.java:197)
at 
org.apache.flink.tests.util.mongodb.MongoE2ECase.(MongoE2ECase.java:90)
... 56 more
Caused by: org.apache.flink.connector.testframe.container.ImageBuildException: 
Failed to build image "flink-configured-jobmanager"
at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.build(FlinkImageBuilder.java:234)
at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configureJobManagerContainer(FlinkTestcontainersConfigurator.java:65)
... 59 more
Caused by: java.lang.RuntimeException: java.lang.NoClassDefFoundError: 
org/apache/commons/codec/Charsets
at org.rnorth.ducttape.timeouts.Timeouts.callFuture(Timeouts.java:68)
at 
org.rnorth.ducttape.timeouts.Timeouts.getWithTimeout(Timeouts.java:43)
at org.testcontainers.utility.LazyFuture.get(LazyFuture.java:45)
at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.buildBaseImage(FlinkImageBuilder.java:255)
at 
org.apache.flink.connector.testframe.container.FlinkImageBuilder.build(FlinkImageBuilder.java:206)
... 60 more
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/codec/Charsets
at 
org.apache.commons.compress.archivers.tar.TarArchiveOutputStream.(TarArchiveOutputStream.java:212)
at 
org.apache.commons.compress.archivers.tar.TarArchiveOutputStream.(TarArchiveOutputStream.java:157)
at 
org.apache.commons.compress.archivers.tar.TarArchiveOutputStream.(TarArchiveOutputStream.java:147)
at 
org.testcontainers.images.builder.ImageFromDockerfile.resolve(ImageFromDockerfile.java:129)
at 
org.testcontainers.images.builder.ImageFromDockerfile.resolve(ImageFromDockerfile.java:40)
at 
org.testcontainers.utility.LazyFuture.getResolvedValue(LazyFuture.java:17)
at org.testcontainers.utility.LazyFuture.get(LazyFuture.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.codec.Charsets
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 11 more
{code}


> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35008) Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink Kafka connector

2024-04-06 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834604#comment-17834604
 ] 

Jiabao Sun commented on FLINK-35008:


Hi [~martijnvisser], maybe we should bump the commons-compress version to 
1.26.1.
In version 1.26.0, there should not be a dependency on commons-codec.

see: https://issues.apache.org/jira/browse/COMPRESS-659

> Bump org.apache.commons:commons-compress from 1.25.0 to 1.26.0 for Flink 
> Kafka connector
> 
>
> Key: FLINK-35008
> URL: https://issues.apache.org/jira/browse/FLINK-35008
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / Kafka
>Reporter: Martijn Visser
>Assignee: Martijn Visser
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle

2024-04-06 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834603#comment-17834603
 ] 

Zakelly Lan commented on FLINK-33734:
-

I'd +1 for this optimization.

> Merge unaligned checkpoint state handle
> ---
>
> Key: FLINK-33734
> URL: https://issues.apache.org/jira/browse/FLINK-33734
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Assignee: Feifan Wang
>Priority: Major
>  Labels: pull-request-available
> Attachments: flamegraph.control-group.html, 
> flamegraph.merge-handle-and-serialize-on-tm.html, 
> flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png
>
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554820774


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator-rpc-timeout")

Review Comment:
   Addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35010][connectors/mongodb] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector [flink-connector-mongodb]

2024-04-06 Thread via GitHub


Jiabao-Sun commented on code in PR #32:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/32#discussion_r1554818610


##
flink-connector-mongodb-e2e-tests/pom.xml:
##
@@ -65,6 +65,11 @@ under the License.
mongodb
test

+
+   
+   commons-codec
+   commons-codec

Review Comment:
   @GOODBOY008 that seems a mistake, see 
https://issues.apache.org/jira/browse/COMPRESS-659.
   I think we should bump commons-compress version to 1.26.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34529][table-planner] Introduce FlinkProjectWindowTransposeRule. [flink]

2024-04-06 Thread via GitHub


libenchao commented on code in PR #24567:
URL: https://github.com/apache/flink/pull/24567#discussion_r1554809730


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##
@@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
   rel: Window,
   mq: RelMetadataQuery,
   ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+if (rel.groups.length == 1) {
+  val group = rel.groups.get(0)
+  getUniqueKeysOfWindowGroup(group, rel) match {
+case Some(uniqueKeys) =>
+  val retSet = new JHashSet[ImmutableBitSet]
+  retSet.add(uniqueKeys)
+  val inputKeys = mq.getUniqueKeys(rel.getInput, ignoreNulls)
+  if (inputKeys != null && inputKeys.nonEmpty) {
+inputKeys.foreach(uniqueKey => retSet.add(uniqueKey))
+  }
+  return retSet
+case _ =>
+  }
+}
 getUniqueKeysOfOverAgg(rel, mq, ignoreNulls)
   }
 
+  def getUniqueKeysOfWindowGroup(group: Window.Group, rel: Window): 
Option[ImmutableBitSet] = {
+// If it's a ROW_NUMBER window, then the unique keys are partition by key 
and row number.
+val aggCalls = group.aggCalls
+if (
+  aggCalls.length == 1 && 
aggCalls.get(0).getOperator.equals(SqlStdOperatorTable.ROW_NUMBER)

Review Comment:
   `aggCalls` could be more than one, and any of the aggCalls is `ROW_NUMBER` 
would make it a unique key?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##
@@ -438,9 +438,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
   rel: Window,
   mq: RelMetadataQuery,
   ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
+if (rel.groups.length == 1) {

Review Comment:
   `groups` can be more than one, and any of the `groups` could have 
`ROW_NUMBER`, and all of them are valid unique key candidates?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+// If it's a ROW_NUMBER Rank, then the upsert keys are partition by key 
and row number.
+if (rel.groups.length == 1) {

Review Comment:
   Again, limiting the number of `groups` to 1 is weird. If in streaming, we 
can only support one `group` for now (I'm not 100 percent sure about it), you 
should at least add a comment about it for future maintainability.



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala:
##
@@ -172,6 +172,8 @@ class FlinkRelMdUniqueKeysTest extends 
FlinkRelMdHandlerTestBase {
 
   @Test
   def testGetUniqueKeysOnRank(): Unit = {
+assertEquals(uniqueKeys(Array(7), Array(0)), 
mq.getUniqueKeys(logicalWindow).toSet)

Review Comment:
   Isn't this being tested below?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala:
##
@@ -328,6 +329,11 @@ object RankUtil {
 }
   }
 
+  def getRankNumberColumnIndex(window: Window): Int = {

Review Comment:
   It would be good to check if the window function is `ROW_NUMBER` in case of 
misusage.



##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala:
##
@@ -193,26 +193,26 @@ class FlinkRelMdColumnUniquenessTest extends 
FlinkRelMdHandlerTestBase {
 
   @Test
   def testAreColumnsUniqueOnRank(): Unit = {
-Array(
-  logicalRank,
-  flinkLogicalRank,
-  batchLocalRank,
-  batchGlobalRank,
-  streamRank,
-  logicalRankWithVariableRange,
-  flinkLogicalRankWithVariableRange,
-  streamRankWithVariableRange
-)
-  .foreach {
-rank =>
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0)))
-  (1 until rank.getRowType.getFieldCount).foreach {
-idx => assertFalse(mq.areColumnsUnique(rank, 
ImmutableBitSet.of(idx)))
-  }
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 1)))
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 2)))
-  assertFalse(mq.areColumnsUnique(rank, ImmutableBitSet.of(1, 2)))
-  }
+//Array(

Review Comment:
   Why this is commented out?



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##
@@ -186,6 +187,25 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Window, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= {
+// If it's a ROW_NUMBER Rank

Re: [PR] [FLINK-34529][table-planner] Introduce FlinkProjectWindowTransposeRule. [flink]

2024-04-06 Thread via GitHub


libenchao commented on code in PR #24567:
URL: https://github.com/apache/flink/pull/24567#discussion_r1554798121


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala:
##
@@ -193,26 +193,26 @@ class FlinkRelMdColumnUniquenessTest extends 
FlinkRelMdHandlerTestBase {
 
   @Test
   def testAreColumnsUniqueOnRank(): Unit = {
-Array(
-  logicalRank,
-  flinkLogicalRank,
-  batchLocalRank,
-  batchGlobalRank,
-  streamRank,
-  logicalRankWithVariableRange,
-  flinkLogicalRankWithVariableRange,
-  streamRankWithVariableRange
-)
-  .foreach {
-rank =>
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0)))
-  (1 until rank.getRowType.getFieldCount).foreach {
-idx => assertFalse(mq.areColumnsUnique(rank, 
ImmutableBitSet.of(idx)))
-  }
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 1)))
-  assertTrue(mq.areColumnsUnique(rank, ImmutableBitSet.of(0, 2)))
-  assertFalse(mq.areColumnsUnique(rank, ImmutableBitSet.of(1, 2)))
-  }
+//Array(

Review Comment:
   Why is this commented out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35026) Introduce async-state configurations

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35026:
--

 Summary: Introduce async-state configurations
 Key: FLINK-35026
 URL: https://issues.apache.org/jira/browse/FLINK-35026
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35025) Wire AsyncExecutionController to AbstractStreamOperator

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35025:
--

 Summary: Wire AsyncExecutionController to AbstractStreamOperator
 Key: FLINK-35025
 URL: https://issues.apache.org/jira/browse/FLINK-35025
 Project: Flink
  Issue Type: Sub-task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35024) Implement record order preservation and buffering of AsyncExecutionController

2024-04-06 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-35024:
--

 Summary: Implement record order preservation and buffering of 
AsyncExecutionController
 Key: FLINK-35024
 URL: https://issues.apache.org/jira/browse/FLINK-35024
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yanfei Lei






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


fredia commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554815099


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;

Review Comment:
   Thanks for the explanation, agreed👍.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


fredia commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554814874


##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImplTest.java:
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.core.state.StateFutureUtils;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ContextStateFutureImpl}. */
+public class ContextStateFutureImplTest {
+
+@Test
+public void testThenApply() {
+SingleStepRunner runner = new SingleStepRunner();
+KeyAccountingUnit keyAccountingUnit = new 
KeyAccountingUnit<>();
+RecordContext recordContext =
+new RecordContext<>(keyAccountingUnit, "a", "b");
+
+// validate
+ContextStateFutureImpl future =
+new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.thenApply((v) -> 1L);
+future.complete(null);
+assertThat(runner.runThrough()).isTrue();
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+
+// validate completion before callback
+future = new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.complete(null);
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+future.thenApply((v) -> 1L);
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+assertThat(runner.runThrough()).isFalse();
+}
+
+@Test
+public void testThenAccept() {
+SingleStepRunner runner = new SingleStepRunner();
+KeyAccountingUnit keyAccountingUnit = new 
KeyAccountingUnit<>();
+RecordContext recordContext =
+new RecordContext<>(keyAccountingUnit, "a", "b");
+
+// validate
+ContextStateFutureImpl future =
+new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.thenAccept((v) -> {});
+future.complete(null);
+assertThat(runner.runThrough()).isTrue();
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+
+// validate completion before callback
+future = new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.complete(null);
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+future.thenAccept((v) -> {});
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+assertThat(runner.runThrough()).isFalse();
+}
+
+@Test
+public void testThenCompose() {
+SingleStepRunner runner = new SingleStepRunner();
+KeyAccountingUnit keyAccountingUnit = new 
KeyAccountingUnit<>();
+RecordContext recordContext =
+new RecordContext<>(keyAccountingUnit, "a", "b");
+
+// validate
+ContextStateFutureImpl future =
+new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.thenCompose((v) -> StateFutureUtils.completedFuture(1L));
+future.complete(null);
+assertThat(runner.runThrough()).isTrue();
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+
+// validate completion before callback
+future = new ContextStateFutureImpl<>(runner::submit, recordContext);
+assertThat(recordContext.getReferenceCount()).isEqualTo(1);
+future.complete(null);
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+future.thenCompose((v) -> StateFutureUtils.completedFuture(1L));
+assertThat(recordContext.getReferenceCount()).isEqualTo(0);
+assertThat(runner.runThrough()).isFalse();
+}
+
+

[PR] [minor][cdc][docs] Optimize styles of the Flink CDC website docs home… [flink-cdc]

2024-04-06 Thread via GitHub


Laffery opened a new pull request, #3208:
URL: https://github.com/apache/flink-cdc/pull/3208

   Optimize styles of the Flink CDC website documentation homepage:
   
   - support responsive styles to improve reading experience under mobile 
device.
   - optimize the background image to avoid large blank areas and increase the 
information content of the first screen
   - highlight the`Quick Start` button to help users find it well
   
   Take iPhone 14pm (430px, 932px) for an example 
   
   **Before:**
   https://github.com/apache/flink-cdc/assets/49607541/65005938-f4f3-4d9b-8740-24ab63fc7013";>
   https://github.com/apache/flink-cdc/assets/49607541/3a2ec46f-4226-4e73-a92e-b7945bc8f37f";>
   https://github.com/apache/flink-cdc/assets/49607541/b964b230-7460-4e92-a27c-78a07b9fcd4b";>
   
   
   **After:**
   https://github.com/apache/flink-cdc/assets/49607541/a558175a-23fb-4677-b31a-fc090da60f33";>
   https://github.com/apache/flink-cdc/assets/49607541/b36f0491-7250-4f4c-9869-02140d95794e";>
   https://github.com/apache/flink-cdc/assets/49607541/c1f64d82-9cd0-44f8-983b-be3fae4b19a6";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-06 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834595#comment-17834595
 ] 

Zakelly Lan commented on FLINK-32070:
-

[~zhoujira86] Yes, this helps the scenario where too many files are created and 
deleted and the NameNode of HDFS is under high pressure. This feature is not 
available yet and will be in FLINK 1.20.

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


Zakelly commented on PR #24614:
URL: https://github.com/apache/flink/pull/24614#issuecomment-2041303349

   @fredia thank for your suggestions!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34405][tests] Fix unstable test RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort1 and testCancelOuterJoinTaskWhileSort2 [flink]

2024-04-06 Thread via GitHub


flinkbot commented on PR #24628:
URL: https://github.com/apache/flink/pull/24628#issuecomment-2041303328

   
   ## CI report:
   
   * 5348510da80f6f12061ae0c9ad7e5e38792b188a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554811623


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;

Review Comment:
   The reason we keep it `org.apache.flink.api.common.state.v2.State` here is 
we hope the direct implementation of KV (aka `internal key-value` layer) to be 
an unified light-weight one,  which is only a simple bridge between 
user-interface to `AEC` (see `AsyncExecutionControllerTest#TestValueState` as 
an example). I think we could make it internal kv interface or class parameter 
in future but for now I suggest we keep it simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34405) RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort2 fails due to an interruption of the RightOuterJoinDriver#prepare method

2024-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34405:
---
Labels: pull-request-available starter test-stability  (was: starter 
test-stability)

> RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort2 fails due to an 
> interruption of the RightOuterJoinDriver#prepare method
> 
>
> Key: FLINK-34405
> URL: https://issues.apache.org/jira/browse/FLINK-34405
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: pull-request-available, starter, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57357&view=logs&j=d89de3df-4600-5585-dadc-9bbc9a5e661c&t=be5a4b15-4b23-56b1-7582-795f58a645a2&l=9027
> {code}
> Feb 07 03:20:16 03:20:16.223 [ERROR] Failures: 
> Feb 07 03:20:16 03:20:16.223 [ERROR] 
> org.apache.flink.runtime.operators.RightOuterJoinTaskTest.testCancelOuterJoinTaskWhileSort2
> Feb 07 03:20:16 03:20:16.223 [ERROR]   Run 1: 
> RightOuterJoinTaskTest>AbstractOuterJoinTaskTest.testCancelOuterJoinTaskWhileSort2:435
>  
> Feb 07 03:20:16 expected: 
> Feb 07 03:20:16   null
> Feb 07 03:20:16  but was: 
> Feb 07 03:20:16   java.lang.Exception: The data preparation caused an error: 
> Interrupted
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase.testDriverInternal(BinaryOperatorTestBase.java:209)
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase.testDriver(BinaryOperatorTestBase.java:189)
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinTaskTest.access$100(AbstractOuterJoinTaskTest.java:48)
> Feb 07 03:20:16   ...(1 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-34405][tests] Fix unstable test RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort1 and testCancelOuterJoinTaskWhileSort2 [flink]

2024-04-06 Thread via GitHub


Jiabao-Sun opened a new pull request, #24628:
URL: https://github.com/apache/flink/pull/24628

   
   
   ## What is the purpose of the change
   
   [FLINK-34405][tests] Fix unstable test 
RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort1 and 
testCancelOuterJoinTaskWhileSort2
   
   ## Brief change log
   
   taskRunner Thread: testDriver() -> AbstractOuterJoinDriver#prepare() :101 -> 
WAITING on ExternalSorter#getIterator().
   
   The InterruptedException is always thrown by BinaryOperatorTestBase:209.
   It will be dropped after cancel() method called, see 
BinaryOperatorTestBase:260.
   
   We can call cancel method after taskRunner thread's state is WAITING to 
prevent this problem.
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34405) RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort2 fails due to an interruption of the RightOuterJoinDriver#prepare method

2024-04-06 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834591#comment-17834591
 ] 

Jiabao Sun commented on FLINK-34405:


taskRunner Thread:  testDriver() -> AbstractOuterJoinDriver#prepare() :101 -> 
WAITING on ExternalSorter#getIterator().

The InterruptedException is always thrown by BinaryOperatorTestBase:209.
It will be dropped after cancel() method called, see BinaryOperatorTestBase:260.

> RightOuterJoinTaskTest#testCancelOuterJoinTaskWhileSort2 fails due to an 
> interruption of the RightOuterJoinDriver#prepare method
> 
>
> Key: FLINK-34405
> URL: https://issues.apache.org/jira/browse/FLINK-34405
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.17.2, 1.19.0, 1.18.1, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: starter, test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57357&view=logs&j=d89de3df-4600-5585-dadc-9bbc9a5e661c&t=be5a4b15-4b23-56b1-7582-795f58a645a2&l=9027
> {code}
> Feb 07 03:20:16 03:20:16.223 [ERROR] Failures: 
> Feb 07 03:20:16 03:20:16.223 [ERROR] 
> org.apache.flink.runtime.operators.RightOuterJoinTaskTest.testCancelOuterJoinTaskWhileSort2
> Feb 07 03:20:16 03:20:16.223 [ERROR]   Run 1: 
> RightOuterJoinTaskTest>AbstractOuterJoinTaskTest.testCancelOuterJoinTaskWhileSort2:435
>  
> Feb 07 03:20:16 expected: 
> Feb 07 03:20:16   null
> Feb 07 03:20:16  but was: 
> Feb 07 03:20:16   java.lang.Exception: The data preparation caused an error: 
> Interrupted
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase.testDriverInternal(BinaryOperatorTestBase.java:209)
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase.testDriver(BinaryOperatorTestBase.java:189)
> Feb 07 03:20:16   at 
> org.apache.flink.runtime.operators.AbstractOuterJoinTaskTest.access$100(AbstractOuterJoinTaskTest.java:48)
> Feb 07 03:20:16   ...(1 remaining lines not displayed - this can be 
> changed with Assertions.setMaxStackTraceElementsDisplayed)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35010][connectors/mongodb] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector [flink-connector-mongodb]

2024-04-06 Thread via GitHub


GOODBOY008 commented on code in PR #32:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/32#discussion_r1554810987


##
flink-connector-mongodb-e2e-tests/pom.xml:
##
@@ -65,6 +65,11 @@ under the License.
mongodb
test

+
+   
+   commons-codec
+   commons-codec

Review Comment:
   Yes, before add this dependency 
https://github.com/GOODBOY008/flink-connector-mongodb/actions/runs/8557577952/job/23450146047#step:15:11104
 ,ci failed with error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


Zakelly commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554810305


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.core.state.InternalStateFuture;
+
+/**
+ * An internal factory for {@link InternalStateFuture} that build future with 
necessary context
+ * switch and wired with mailbox executor.
+ */
+public class StateFutureFactory {

Review Comment:
   I'd rather keep it with no annotation, since IIUC only interfaces for 
internal usage should be annotated. The package `org.apache.flink.runtime` is 
not for APIs but for implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34908:
---
Labels: pull-request-available  (was: )

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34908) [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision for timestamp

2024-04-06 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834589#comment-17834589
 ] 

Xin Gong commented on FLINK-34908:
--

Starrocks only support second precision. 
[https://docs.starrocks.io/zh/docs/sql-reference/data-types/date-types/DATETIME/.|https://docs.starrocks.io/zh/docs/sql-reference/data-types/date-types/DATETIME/]
 So I will process doris.

> [Feature][Pipeline] Mysql pipeline to doris and starrocks will lost precision 
> for timestamp
> ---
>
> Key: FLINK-34908
> URL: https://issues.apache.org/jira/browse/FLINK-34908
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Minor
> Fix For: cdc-3.1.0
>
>
> flink cdc pipeline will decide timestamp zone by config of pipeline. I found 
> mysql2doris and mysql2starracks will specific datetime format
> -MM-dd HH:mm:ss, it will cause lost datatime precision. I think we 
> should't set fixed datetime format, just return LocalDateTime object.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]

2024-04-06 Thread via GitHub


lincoln-lil commented on PR #24517:
URL: https://github.com/apache/flink/pull/24517#issuecomment-2041294975

   Rebased latest 1.19 and triggered a new ci run before merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


fredia commented on code in PR #24614:
URL: https://github.com/apache/flink/pull/24614#discussion_r1554800173


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.core.state.StateFutureImpl;
+
+/**
+ * A state future that holds the {@link RecordContext} and maintains the 
reference count of it. The
+ * reason why we maintain the reference here is that the 
ContextStateFutureImpl can be created
+ * multiple times since user may chain their code wildly, some of which are 
only for internal usage
+ * (See {@link StateFutureImpl}). So maintaining reference counting by the 
lifecycle of state future
+ * is relatively simple and less error-prone.
+ *
+ * Reference counting added on {@link RecordContext} follows:
+ * 1. +1 when this future created.
+ * 2. -1 when future completed.
+ * 3. +1 when callback registered.
+ * 4. -1 when callback finished.
+ */
+public class ContextStateFutureImpl extends StateFutureImpl {
+
+private final RecordContext recordContext;
+
+ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext 
recordContext) {
+super(callbackRunner);
+this.recordContext = recordContext;
+// When state request submitted, ref count +1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+recordContext.retain();
+}
+
+@Override
+public  StateFutureImpl makeNewStateFuture() {
+return new ContextStateFutureImpl<>(callbackRunner, recordContext);
+}
+
+@Override
+public void callbackRegistered() {
+// When a callback registered, as shown in Fig.5 of FLIP-425, at the 
point of 3 and 5, the
+// ref count -1.

Review Comment:
   +1?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.core.state.InternalStateFuture;
+
+import javax.annotation.Nullable;
+
+/**
+ * A request encapsulates the necessary data to perform a state request.
+ *
+ * @param  Type of partitioned key.
+ * @param  Type of input of this request.
+ * @param  Type of value that request will return.
+ */
+public class StateRequest {
+
+/** The type of processing request. */
+public enum RequestType {
+/** Process one record without state access. */
+SYNC,
+/** Get from one {@link State}. */
+GET,
+/** Put to one {@link State}. */
+PUT,
+/** Merge value to an exist key in {@link State}. Mainly used for 
listState. */
+MERGE,
+/** Delete from one {@link State}. */
+DELETE
+}
+
+/** The underlying state to be accessed, can be empty for {@link 
RequestType#SYNC}. */
+@Nullable private final State state;

Review Comment:
   Should the type of `state` be restricted to 
`org.apache.flink.api.common.state.v2.State`?
   IIUC, this underlying state is a lower-level state that directly interacts 
with the KV eng

Re: [PR] [FLINK-34712][release] Generate reference data for state migration tests based on release-1.19.0 [flink]

2024-04-06 Thread via GitHub


lincoln-lil commented on PR #24517:
URL: https://github.com/apache/flink/pull/24517#issuecomment-2041294668

   @masteryhx Thanks for reviewing this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34959] Update old flink-cdc-connectors artifactId [flink-cdc]

2024-04-06 Thread via GitHub


xleoken commented on PR #3200:
URL: https://github.com/apache/flink-cdc/pull/3200#issuecomment-2041294135

   cc @Jiabao-Sun @PatrickRen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Correct the option key for sortPartition's java doc [flink]

2024-04-06 Thread via GitHub


flinkbot commented on PR #24627:
URL: https://github.com/apache/flink/pull/24627#issuecomment-2041291845

   
   ## CI report:
   
   * 7435124358660a306a82811a6a94a0799cbddcab UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34529][table-planner] Introduce FlinkProjectWindowTransposeRule. [flink]

2024-04-06 Thread via GitHub


RubyChou commented on PR #24567:
URL: https://github.com/apache/flink/pull/24567#issuecomment-2041291550

   @flinkbot  run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix] Correct the option key for sortPartition's java doc [flink]

2024-04-06 Thread via GitHub


reswqa opened a new pull request, #24627:
URL: https://github.com/apache/flink/pull/24627

   ## What is the purpose of the change
   
   *Correct the option key for sortPartition's java doc*
   
   
   ## Brief change log
   
 - *Correct the option key for sortPartition's java doc.*
 - *Document the field start from 1 for sortPartition*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34959] Update old flink-cdc-connectors artifactId [flink-cdc]

2024-04-06 Thread via GitHub


xleoken commented on PR #3200:
URL: https://github.com/apache/flink-cdc/pull/3200#issuecomment-2041289392

   cc @Jiabao-Sun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34690] Cast decimal to VARCHAR as primary key in starrocks sink [flink-cdc]

2024-04-06 Thread via GitHub


loserwang1024 commented on code in PR #3150:
URL: https://github.com/apache/flink-cdc/pull/3150#discussion_r1554803332


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/CdcDataTypeTransformerTest.java:
##
@@ -75,6 +76,50 @@ public void testCharTypeForPrimaryKey() {
 assertTrue(smallLengthColumn.isNullable());
 }
 
+@Test
+public void testDecimalForPrimaryKey() {
+// Map to DECIMAL of StarRocks if column is DECIMAL type and not 
primary key.
+StarRocksColumn.Builder noPrimaryKeyBuilder =
+new 
StarRocksColumn.Builder().setColumnName("no_primary_key").setOrdinalPosition(0);
+new DecimalType(20, 1)
+.accept(new StarRocksUtils.CdcDataTypeTransformer(false, 
noPrimaryKeyBuilder));
+StarRocksColumn noPrimaryKeyColumn = noPrimaryKeyBuilder.build();
+assertEquals("no_primary_key", noPrimaryKeyColumn.getColumnName());
+assertEquals(0, noPrimaryKeyColumn.getOrdinalPosition());
+assertEquals(StarRocksUtils.DECIMAL, noPrimaryKeyColumn.getDataType());
+assertEquals(Integer.valueOf(20), 
noPrimaryKeyColumn.getColumnSize().orElse(null));
+assertEquals(Integer.valueOf(1), 
noPrimaryKeyColumn.getDecimalDigits().get());
+assertTrue(noPrimaryKeyColumn.isNullable());
+
+// Map to VARCHAR of StarRocks if column is DECIMAL type and primary 
key.
+StarRocksColumn.Builder primaryKeyBuilder =
+new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+new DecimalType(20, 1)
+.notNull()
+.accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
primaryKeyBuilder));
+StarRocksColumn primaryKeyColumn = primaryKeyBuilder.build();
+assertEquals("primary_key", primaryKeyColumn.getColumnName());
+assertEquals(1, primaryKeyColumn.getOrdinalPosition());
+assertEquals(StarRocksUtils.VARCHAR, primaryKeyColumn.getDataType());
+assertEquals(Integer.valueOf(22), 
primaryKeyColumn.getColumnSize().orElse(null));
+assertTrue(!primaryKeyColumn.isNullable());
+
+// Map to VARCHAR of StarRocks if column is DECIMAL type and primary 
key
+// DECIMAL(20,0) is common in cdc pipeline, for example, the upstream 
cdc source is unsigned
+// BIGINT.
+StarRocksColumn.Builder unsignedBigIntKeyBuilder =
+new 
StarRocksColumn.Builder().setColumnName("primary_key").setOrdinalPosition(1);
+new DecimalType(20, 0)
+.notNull()
+.accept(new StarRocksUtils.CdcDataTypeTransformer(true, 
unsignedBigIntKeyBuilder));
+StarRocksColumn unsignedBigIntColumn = 
unsignedBigIntKeyBuilder.build();
+assertEquals("primary_key", unsignedBigIntColumn.getColumnName());
+assertEquals(1, unsignedBigIntColumn.getOrdinalPosition());
+assertEquals(StarRocksUtils.VARCHAR, 
unsignedBigIntColumn.getDataType());
+assertEquals(Integer.valueOf(22), 
unsignedBigIntColumn.getColumnSize().orElse(null));

Review Comment:
   Done it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]

2024-04-06 Thread via GitHub


loserwang1024 commented on code in PR #3204:
URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554801483


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java:
##
@@ -35,4 +35,10 @@ public class ValuesDataSinkOptions {
 .booleanType()
 .defaultValue(true)
 .withDescription("True if the Event should be print to 
console.");
+
+public static final ConfigOption LEGACY_ENABLED =
+ConfigOptions.key("legacy.enabled")

Review Comment:
   Add `SinkType`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34952][cdc-composer][sink] Flink CDC pipeline supports SinkFunction [flink-cdc]

2024-04-06 Thread via GitHub


loserwang1024 commented on code in PR #3204:
URL: https://github.com/apache/flink-cdc/pull/3204#discussion_r1554801419


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperator.java:
##
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.sink;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * An operator that processes records to be written into a {@link
+ * org.apache.flink.streaming.api.functions.sink.SinkFunction}.
+ *
+ * The operator is a proxy of {@link 
org.apache.flink.streaming.api.operators.StreamSink} in
+ * Flink.
+ *
+ * The operator is always part of a sink pipeline and is the first operator.
+ */
+@Internal
+public class DataSinkOperator extends StreamSink {

Review Comment:
   Done it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834588#comment-17834588
 ] 

Jane Chan commented on FLINK-24939:
---

Fixed in master 2747a5814bcc5cd45f15c023beba9b0644fe1ead

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan closed FLINK-24939.
-

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan resolved FLINK-24939.
---
Resolution: Fixed

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-24939) Support 'SHOW CREATE CATALOG' syntax

2024-04-06 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-24939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-24939:
--
Fix Version/s: 1.20.0

> Support 'SHOW CREATE CATALOG' syntax
> 
>
> Key: FLINK-24939
> URL: https://issues.apache.org/jira/browse/FLINK-24939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Yubin Li
>Assignee: Yubin Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> SHOW CREATE CATALOG ;
>  
> `Catalog` is playing a more import role in flink, it would be great to get 
> existing catalog detail information



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-04-06 Thread via GitHub


LadyForest merged PR #24555:
URL: https://github.com/apache/flink/pull/24555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32079) Read/write checkpoint metadata of merged files

2024-04-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-32079.
--
Resolution: Fixed

merged ea4e4981 and 0b7f0fde into master

> Read/write checkpoint metadata of merged files
> --
>
> Key: FLINK-32079
> URL: https://issues.apache.org/jira/browse/FLINK-32079
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Hangxiang Yu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32079][checkpoint] Support to read/write checkpoint metadata of merged files [flink]

2024-04-06 Thread via GitHub


masteryhx closed pull request #24480: [FLINK-32079][checkpoint] Support to 
read/write checkpoint metadata of merged files
URL: https://github.com/apache/flink/pull/24480


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34979][State] Implement State Future and related utilities [flink]

2024-04-06 Thread via GitHub


masteryhx closed pull request #24597: [FLINK-34979][State] Implement State 
Future and related utilities
URL: https://github.com/apache/flink/pull/24597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-04-06 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834584#comment-17834584
 ] 

Weijie Guo commented on FLINK-18476:


1.20 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58755&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=22270

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


Zakelly commented on PR #24614:
URL: https://github.com/apache/flink/pull/24614#issuecomment-2041280495

   @fredia @masteryhx would you please take a look? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34273) git fetch fails

2024-04-06 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834585#comment-17834585
 ] 

Weijie Guo commented on FLINK-34273:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58755&view=logs&j=26b84117-e436-5720-913e-3e280ce55cae&t=114e1a26-135e-5832-d3ea-5b589cffc172&l=387

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails

2024-04-06 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834584#comment-17834584
 ] 

Weijie Guo edited comment on FLINK-18476 at 4/7/24 2:25 AM:


1.20 jdk17 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58755&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=22270


was (Author: weijie guo):
1.20 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58755&view=logs&j=d871f0ce-7328-5d00-023b-e7391f5801c8&t=77cbea27-feb9-5cf5-53f7-3267f9f9c6b6&l=22270

> PythonEnvUtilsTest#testStartPythonProcess fails
> ---
>
> Key: FLINK-18476
> URL: https://issues.apache.org/jira/browse/FLINK-18476
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0
>Reporter: Dawid Wysakowicz
>Priority: Major
>  Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> test-stability
>
> The 
> {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} 
> failed in my local environment as it assumes the environment has 
> {{/usr/bin/python}}. 
> I don't know exactly how did I get python in Ubuntu 20.04, but I have only 
> alias for {{python = python3}}. Therefore the tests fails.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34273) git fetch fails

2024-04-06 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834583#comment-17834583
 ] 

Weijie Guo commented on FLINK-34273:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58754&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=61c73713-1b77-5132-1d22-4d746b4b06d8&l=277

> git fetch fails
> ---
>
> Key: FLINK-34273
> URL: https://issues.apache.org/jira/browse/FLINK-34273
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI, Test Infrastructure
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Priority: Major
>  Labels: test-stability
>
> We've seen multiple {{git fetch}} failures. I assume this to be an 
> infrastructure issue. This Jira issue is for documentation purposes.
> {code:java}
> error: RPC failed; curl 18 transfer closed with outstanding read data 
> remaining
> error: 5211 bytes of body are still expected
> fetch-pack: unexpected disconnect while reading sideband packet
> fatal: early EOF
> fatal: fetch-pack: invalid index-pack output {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57080&view=logs&j=0e7be18f-84f2-53f0-a32d-4a5e4a174679&t=5d6dc3d3-393d-5111-3a40-c6a5a36202e6&l=667



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34986][Runtime/State] Basic framework of async execution for state [flink]

2024-04-06 Thread via GitHub


Zakelly commented on PR #24614:
URL: https://github.com/apache/flink/pull/24614#issuecomment-2041280097

   Rebased and pushed again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35023) YARNApplicationITCase failed on Azure

2024-04-06 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35023:
--

 Summary: YARNApplicationITCase failed on Azure
 Key: FLINK-35023
 URL: https://issues.apache.org/jira/browse/FLINK-35023
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo


1. 
YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion

{code:java}
Apr 06 02:19:44 02:19:44.063 [ERROR] 
org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion
 -- Time elapsed: 9.727 s <<< FAILURE!
Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
while expecting FINISHED
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion$1(YARNApplicationITCase.java:72)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndDisableUserJarInclusion(YARNApplicationITCase.java:70)
Apr 06 02:19:44 at 
java.base/java.lang.reflect.Method.invoke(Method.java:568)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

{code}


2. YARNApplicationITCase.testApplicationClusterWithRemoteUserJar


{code:java}
Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
while expecting FINISHED
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithRemoteUserJar$2(YARNApplicationITCase.java:86)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithRemoteUserJar(YARNApplicationITCase.java:84)
Apr 06 02:19:44 at 
java.base/java.lang.reflect.Method.invoke(Method.java:568)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
{code}



3. 
YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion


{code:java}
Apr 06 02:19:44 java.lang.AssertionError: Application became FAILED or KILLED 
while expecting FINISHED
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.waitApplicationFinishedElseKillIt(YarnTestBase.java:1282)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.deployApplication(YARNApplicationITCase.java:116)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.lambda$testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion$0(YARNApplicationITCase.java:62)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YarnTestBase.runTest(YarnTestBase.java:303)
Apr 06 02:19:44 at 
org.apache.flink.yarn.YARNApplicationITCase.testApplicationClusterWithLocalUserJarAndFirstUserJarInclusion(YARNApplicationITCase.java:60)
Apr 06 02:19:44 at 
java.base/java.lang.reflect.Method.invoke(Method.java:568)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:194)
Apr 06 02:19:44 at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
Apr 06 02:19:44 at 
java.base/java.u

[jira] [Updated] (FLINK-34979) Implement State Future and related utilities

2024-04-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-34979:
-
Component/s: Runtime / State Backends

> Implement State Future and related utilities
> 
>
> Key: FLINK-34979
> URL: https://issues.apache.org/jira/browse/FLINK-34979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> Implement the StateFuture.
> In the very first version, we leverage the CompletableFuture to ship the 
> functionalities. Although this is not optimal in performance, we can swiftly 
> build up the whole framework by this approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34979) Implement State Future and related utilities

2024-04-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu resolved FLINK-34979.
--
Resolution: Fixed

merged 739bd337 into master

> Implement State Future and related utilities
> 
>
> Key: FLINK-34979
> URL: https://issues.apache.org/jira/browse/FLINK-34979
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> Implement the StateFuture.
> In the very first version, we leverage the CompletableFuture to ship the 
> functionalities. Although this is not optimal in performance, we can swiftly 
> build up the whole framework by this approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34979) Implement State Future and related utilities

2024-04-06 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-34979:
-
Fix Version/s: 1.20.0

> Implement State Future and related utilities
> 
>
> Key: FLINK-34979
> URL: https://issues.apache.org/jira/browse/FLINK-34979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Implement the StateFuture.
> In the very first version, we leverage the CompletableFuture to ship the 
> functionalities. Although this is not optimal in performance, we can swiftly 
> build up the whole framework by this approach.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34932) Translate concepts of Flink-Kubernetes-Operator documentation

2024-04-06 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34932:
---

Assignee: Caican Cai

> Translate concepts of Flink-Kubernetes-Operator documentation
> -
>
> Key: FLINK-34932
> URL: https://issues.apache.org/jira/browse/FLINK-34932
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Kubernetes Operator
>Affects Versions: 1.9.0
>Reporter: Caican Cai
>Assignee: Caican Cai
>Priority: Minor
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34970) Translate architecture documents into Chinese

2024-04-06 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34970:
---

Assignee: Caican Cai

> Translate architecture documents into Chinese
> -
>
> Key: FLINK-34970
> URL: https://issues.apache.org/jira/browse/FLINK-34970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Affects Versions: 1.9.0
>Reporter: Caican Cai
>Assignee: Caican Cai
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.1
>
>
> Translate architecture documents into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34980) Translate overview document into Chinese

2024-04-06 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-34980:
---

Assignee: Caican Cai

> Translate overview document into Chinese
> 
>
> Key: FLINK-34980
> URL: https://issues.apache.org/jira/browse/FLINK-34980
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Caican Cai
>Assignee: Caican Cai
>Priority: Minor
>  Labels: pull-request-available
>
> Translate overview document into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Fix YARN ContainerId.getId Deprecated Used. [flink]

2024-04-06 Thread via GitHub


1996fanrui merged PR #24601:
URL: https://github.com/apache/flink/pull/24601


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34995) flink kafka connector source stuck when partition leader invalid

2024-04-06 Thread yansuopeng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834581#comment-17834581
 ] 

yansuopeng commented on FLINK-34995:


[~martijnvisser]  Apologies for the confusion; the issue described is not a bug 
within Flink itself. Instead, it pertains to Flink's implementation when 
utilizing Kafka's blocking API, which may lead to the problem mentioned. This 
can be addressed by using the invalid leader filter and discovery partition 
interval.

> flink kafka connector source stuck when partition leader invalid
> 
>
> Key: FLINK-34995
> URL: https://issues.apache.org/jira/browse/FLINK-34995
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: yansuopeng
>Priority: Major
>
> when partition leader invalid(leader=-1),  the flink streaming job using 
> KafkaSource can't restart or start a new instance with a new groupid,  it 
> will stuck and got following exception:
> "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition aaa-1 could be determined{*}"
> when leader=-1,  kafka api like KafkaConsumer.position() will block until 
> either the position could be determined or an unrecoverable error is 
> encountered 
> infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline 
> together will trigger the problem, especially when the cluster size is 
> relatively large.    it rely on kafka administrator to fix in time,  but it 
> take risk when in kafka cluster peak period.
> I have solve this problem, and want to create a PR. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [cdc-connector][cdc-base] Shade guava31 to avoid dependency conflict with flink below 1.18 [flink-cdc]

2024-04-06 Thread via GitHub


loserwang1024 commented on code in PR #3083:
URL: https://github.com/apache/flink-cdc/pull/3083#discussion_r1554797030


##
pom.xml:
##
@@ -462,8 +462,15 @@ under the License.
 submodules, ${flink.version} will be 
resolved as the actual Flink version.
 -->
 
org.apache.flink:flink-shaded-force-shading
+
org.apache.flink:flink-shaded-guava
 
 
+
+
+flink.shaded.guava

Review Comment:
   Done it.(By the way, I used to use `com.google.guava` rather than flink's 
shaded guava, but there are lots of limit in `checkstyle.xml`):
   ```xml
   



   
   ```
   or
   
   ```xml




   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35010][connectors/mongodb] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.0 for Flink Mongodb connector [flink-connector-mongodb]

2024-04-06 Thread via GitHub


Jiabao-Sun commented on code in PR #32:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/32#discussion_r1554796798


##
flink-connector-mongodb-e2e-tests/pom.xml:
##
@@ -65,6 +65,11 @@ under the License.
mongodb
test

+
+   
+   commons-codec
+   commons-codec

Review Comment:
   Hi @GOODBOY008, why do we need the `commons-codec` dependency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-06 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834580#comment-17834580
 ] 

xiaogang zhou commented on FLINK-32070:
---

[~Zakelly] Hi, we met a problem that FLINK checkpoint has too many sst files 
will cause great IOPS on HDFS. Can this issue help on that scenario?

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34973) FLIP-425: Asynchronous Execution Model

2024-04-06 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei reassigned FLINK-34973:
--

Assignee: Yanfei Lei

> FLIP-425: Asynchronous Execution Model
> --
>
> Key: FLINK-34973
> URL: https://issues.apache.org/jira/browse/FLINK-34973
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends, 
> Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Yanfei Lei
>Priority: Major
> Fix For: 2.0.0
>
>
> This is a sub-FLIP for the disaggregated state management and its related 
> work, please read the FLIP-423 first to know the whole story.
> FLIP-424 introduces asynchronous state APIs with callbacks allowing state 
> access to be executed in threads separate from the task thread, making better 
> usage of I/O bandwidth and enhancing throughput. This FLIP proposes an 
> execution framework for asynchronous state APIs. The execution code path for 
> the new API is completely independent from the original one, where many 
> runtime components are redesigned. We intend to delve into the challenges 
> associated with asynchronous execution and provide an in-depth design 
> analysis for each module. Furthermore, we will conduct a performance analysis 
> of the new framework relative to the current implementation and examine how 
> it measures up against other potential alternatives.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554795627


##
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java:
##
@@ -85,5 +89,12 @@ public class PipelineOptions {
 .withDescription(
 "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+public static final ConfigOption 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+ConfigOptions.key("schema-operator-rpc-timeout")

Review Comment:
   ```suggestion
   ConfigOptions.key("schema-operator.rpc-timeout")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


leonardBang commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554795438


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,4 +136,13 @@ RESPONSE sendRequestToCoordinator(REQUEST request) {
 "Failed to send request to coordinator: " + 
request.toString(), e);
 }
 }
+
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {
+if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review Comment:
   Got it, it will better to add the explanation as code note



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [cdc-cli] support recover from a specific savepoint file [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #2959:
URL: https://github.com/apache/flink-cdc/pull/2959#discussion_r1554795369


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java:
##
@@ -53,12 +53,46 @@ public class CliFrontendOptions {
 .desc("Use Flink MiniCluster to run the pipeline")
 .build();
 
+public static final Option SAVEPOINT_PATH_OPTION =
+Option.builder("s")
+.longOpt("fromSavepoint")

Review Comment:
   Addressed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34995) flink kafka connector source stuck when partition leader invalid

2024-04-06 Thread yansuopeng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yansuopeng updated FLINK-34995:
---
Issue Type: Improvement  (was: Bug)

> flink kafka connector source stuck when partition leader invalid
> 
>
> Key: FLINK-34995
> URL: https://issues.apache.org/jira/browse/FLINK-34995
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.17.0, 1.19.0, 1.18.1
>Reporter: yansuopeng
>Priority: Major
>
> when partition leader invalid(leader=-1),  the flink streaming job using 
> KafkaSource can't restart or start a new instance with a new groupid,  it 
> will stuck and got following exception:
> "{*}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition aaa-1 could be determined{*}"
> when leader=-1,  kafka api like KafkaConsumer.position() will block until 
> either the position could be determined or an unrecoverable error is 
> encountered 
> infact,  leader=-1 not easy to avoid,  even replica=3, three disk offline 
> together will trigger the problem, especially when the cluster size is 
> relatively large.    it rely on kafka administrator to fix in time,  but it 
> take risk when in kafka cluster peak period.
> I have solve this problem, and want to create a PR. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-06 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834533#comment-17834533
 ] 

Ahmed Hamdy commented on FLINK-35022:
-

[~danny.cranmer] Wdyt about this proposal? 
Additionally could please assign it to myself

> Add TypeInformed Element Converter for DynamoDbSink
> ---
>
> Key: FLINK-35022
> URL: https://issues.apache.org/jira/browse/FLINK-35022
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / DynamoDB
>Affects Versions: aws-connector-4.3.0
>Reporter: Ahmed Hamdy
>Priority: Minor
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
> String id;
> int quantity;
> double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter {
> @Override
> public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
> Map itemMap = new HashMap<>();
> itemMap.put("id", AttributeValue.builder().s(order.id).build());
> itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
> itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
> return DynamoDbWriteRequest.builder()
> .setType(DynamoDbWriteRequestType.PUT)
> .setItem(itemMap)
> .build();
> }
> @Override
> public void open(Sink.InitContext context) {
> 
> }
> }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter implements 
> ElementConverter {
> DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
> public DynamoDbWriteRequest convertElement(input) {
> switch this.typeInfo{
> case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
> case: BasicTypeInfo.SHORT_TYPE_INFO: 
> case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>   .
> }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35022) Add TypeInformed Element Converter for DynamoDbSink

2024-04-06 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-35022:
---

 Summary: Add TypeInformed Element Converter for DynamoDbSink
 Key: FLINK-35022
 URL: https://issues.apache.org/jira/browse/FLINK-35022
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / DynamoDB
Affects Versions: aws-connector-4.3.0
Reporter: Ahmed Hamdy


h2. Context
{{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
{{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
Flink stream objects to DynamoDb write requests, where item is represented as 
{{Map}}.

{{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
format similar with type identification properties as in
{M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.

Since TypeInformation is already natively supported in Flink, many 
implementations of the DynamoDb ElementConverted is just a boiler plate. 
For example 
{code:title="Simple POJO Element Conversion"}
 public class Order {
String id;
int quantity;
double total;
}
{code}

The implementation of the converter must be 

{code:title="Simple POJO DDB Element Converter"}
public static class SimplePojoElementConverter implements 
ElementConverter {

@Override
public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
context) {
Map itemMap = new HashMap<>();
itemMap.put("id", AttributeValue.builder().s(order.id).build());
itemMap.put("quantity", 
AttributeValue.builder().n(String.valueOf(order.quantity)).build());
itemMap.put("total", 
AttributeValue.builder().n(String.valueOf(order.total)).build());
return DynamoDbWriteRequest.builder()
.setType(DynamoDbWriteRequestType.PUT)
.setItem(itemMap)
.build();
}

@Override
public void open(Sink.InitContext context) {

}
}
{code}

while this might not be too much of work, however it is a fairly common case in 
Flink and this implementation requires some fair knowledge of DDB model for new 
users.

h2. Proposal 

Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:

{code:title="TypeInformedElementconverter"} 
public class DynamoDbTypeInformedElementConverter implements 
ElementConverter {
DynamoDbTypeInformedElementConverter(CompositeType typeInfo);
public DynamoDbWriteRequest convertElement(input) {
switch this.typeInfo{
case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
AttributeValue.fromS(o.toString())
case: BasicTypeInfo.SHORT_TYPE_INFO: 
case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
AttributeValue.fromN(o.toString())
   case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
  .
}
}
}

// User Code
public static void main(String []args) {
  DynamoDbTypeInformedElementConverter elementConverter = new 
DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
DdbSink.setElementConverter(elementConverter); 
}

{code}

We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which should 
be enough to cover all DDB supported types (s,n,bool,b,ss,ns,bs,bools,m,l)

1- 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34969][cdc-cli]Add support for both new and old Flink config files in Flink… [flink-cdc]

2024-04-06 Thread via GitHub


skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##
@@ -20,17 +20,24 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
 private static final String FLINK_CONF_DIR = "conf";
-private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+private static final String NEW_FLINK_CONF_FILENAME = "config.yaml";
 
 public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
+Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME);
+// If the old version of the configuration file does not exist, then 
attempt to use the new
+// version of the file name.
+if (!Files.exists(flinkConfPath)) {
+flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME);
+}
 return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   Thank you for your feedback. I have rechecked the code and I agree with you. 
The new config.yaml is not in a map format anymore, so rewriting the parsing 
logic would be necessary. I am willing to take on this task and submit the 
necessary changes.
   
   In addition, I think it might be beneficial to handle the old version of the 
configuration file through exception handling. Here is a possible way to do so:
   
   ```java
   private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
   private static final String FLINK_CONF_FILENAME = "config.yaml";
   public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
   Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
   try {
   return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
   } catch (FileNotFoundException e) {
   LOG.warn(
   "Failed to load the new configuration file:{}. Trying to 
load the old configuration file:{}.",
   FLINK_CONF_FILENAME,
   OLD_FLINK_CONF_FILENAME);
   return ConfigurationUtils.loadMapFormattedConfig(
   
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME));
   }
   }
   ```
   This is my idea for exception handling. We first attempt to load the new 
configuration file. If an exception occurs (which could be due to the absence 
of the new configuration file), we then try to load the old configuration file. 
This approach not only ensures backward compatibility and allows for a smoother 
transition between different versions of Flink, but also results in two log 
notifications from the `loadMapFormattedConfig` method. I think this will make 
the notifications more user-friendly. Do you think this approach is feasible? 
Or do you have any better suggestions or ideas? I look forward to your feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35011) The change in visibility of MockDeserializationSchema cause compilation failure in kafka connector

2024-04-06 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-35011.

Resolution: Fixed

Fixed via master: 3590c2d86f4186771ffcd64712f756d31306eb88

> The change in visibility of MockDeserializationSchema cause compilation 
> failure in kafka connector
> --
>
> Key: FLINK-35011
> URL: https://issues.apache.org/jira/browse/FLINK-35011
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.20.0
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Flink Kafka connector can't compile with 1.20-SNAPSHOT, see 
> https://github.com/apache/flink-connector-kafka/actions/runs/8553981349/job/23438292087?pr=90#step:15:165
> Error message is:
> {code}
> Error:  Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
> (default-testCompile) on project flink-connector-kafka: Compilation failure
> Error:  
> /home/runner/work/flink-connector-kafka/flink-connector-kafka/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java:[60,39]
>  org.apache.flink.streaming.util.MockDeserializationSchema is not public in 
> org.apache.flink.streaming.util; cannot be accessed from outside package
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35011][tests] Change MockDeserializationSchema's visibility to public to prevent compilation failure in kafka connector [flink]

2024-04-06 Thread via GitHub


Jiabao-Sun merged PR #24622:
URL: https://github.com/apache/flink/pull/24622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-04-06 Thread via GitHub


liyubin117 commented on code in PR #24555:
URL: https://github.com/apache/flink/pull/24555#discussion_r1554574354


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CATALOG sql call. */
+public class SqlShowCreateCatalog extends SqlShowCreate {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);
+
+protected final SqlIdentifier catalogName;

Review Comment:
   done :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-27299][Runtime/Configuration] flink parsing parameter bug fixed. [flink]

2024-04-06 Thread via GitHub


wolfboys closed pull request #19798: [FLINK-27299][Runtime/Configuration] flink 
parsing parameter bug fixed.
URL: https://github.com/apache/flink/pull/19798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-04-06 Thread via GitHub


liyubin117 commented on PR #24555:
URL: https://github.com/apache/flink/pull/24555#issuecomment-2041047820

   > I'm still confused about this. The base class `SqlShowCreate` already 
defined a protected variable called `sqlIdentifier`. Why not use it to 
represent the catalog name, just like other subclasses(`SqlShowCreateView`, 
`SqlShowCreateTable`) do?
   
   I did not notice that sqlIdentifier has been defined, it is my fault, thanks 
for pointing it out 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2041034979

   > Thanks @lvyanquan for the fix, I left some comments. And I think tests is 
necessary, e.g. a test mocked RPC timeout when processing schema change event 
is welcome.
   
   Add new tests in 
https://github.com/apache/flink-cdc/pull/3128/files#diff-c3455d69be8d51aab2230d40c29944f6d451116539098421169a38a7469ed3c7R114.
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554559194


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -127,4 +136,13 @@ RESPONSE sendRequestToCoordinator(REQUEST request) {
 "Failed to send request to coordinator: " + 
request.toString(), e);
 }
 }
+
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+if (context.isRestored()) {
+if (getRuntimeContext().getIndexOfThisSubtask() == 0) {

Review Comment:
   Because there may be multiple subtasks for the SchemaOperator, but we just 
want to clear the information in the SchemaRegistryHandler once during restart.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-06 Thread via GitHub


lvyanquan commented on code in PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#discussion_r1554559016


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##
@@ -111,8 +115,13 @@ private SchemaChangeResponse requestSchemaChange(
 return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
 }
 
-private ReleaseUpstreamResponse requestReleaseUpstream() {
-return sendRequestToCoordinator(new ReleaseUpstreamRequest());
+private void requestReleaseUpstream() throws InterruptedException {
+CoordinationResponse coordinationResponse =
+sendRequestToCoordinator(new ReleaseUpstreamRequest());
+while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{

Review Comment:
   Done, add a new pipeline config 
https://github.com/apache/flink-cdc/pull/3128/files#diff-efc0e20a0c40e81813d3be00eadab7bc066598e521b24a85f2c3e5c9b83c2b4bR92
 to control this.



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.event;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/** request for get change result. */

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24939][table] Support `SHOW CREATE CATALOG` syntax [flink]

2024-04-06 Thread via GitHub


LadyForest commented on code in PR #24555:
URL: https://github.com/apache/flink/pull/24555#discussion_r1554549162


##
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateCatalog.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CATALOG sql call. */
+public class SqlShowCreateCatalog extends SqlShowCreate {
+
+public static final SqlSpecialOperator OPERATOR =
+new SqlSpecialOperator("SHOW CREATE CATALOG", SqlKind.OTHER_DDL);
+
+protected final SqlIdentifier catalogName;

Review Comment:
   I'm still confused about this. The base class `SqlShowCreate` already 
defined a protected variable called `sqlIdentifier`. Why not use it to 
represent the catalog name, just like other subclasses(`SqlShowCreateView`, 
`SqlShowCreateTable`) do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-34738) "Deployment - YARN" Page for Flink CDC Chinese Documentation

2024-04-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34738:
---
Labels: pull-request-available  (was: )

> "Deployment - YARN" Page for Flink CDC Chinese Documentation
> 
>
> Key: FLINK-34738
> URL: https://issues.apache.org/jira/browse/FLINK-34738
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation, Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: LvYanquan
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.1.0
>
>
> Translate 
> [https://github.com/apache/flink-cdc/blob/master/docs/content/docs/deployment/yarn.md]
>  into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >