[jira] [Created] (FLINK-35215) The performance of serializerKryo and serializerKryoWithoutRegistration are regressed

2024-04-22 Thread Rui Fan (Jira)
Rui Fan created FLINK-35215:
---

 Summary: The performance of serializerKryo and 
serializerKryoWithoutRegistration are regressed
 Key: FLINK-35215
 URL: https://issues.apache.org/jira/browse/FLINK-35215
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.20.0
Reporter: Rui Fan


The performance of serializerKryo and serializerKryoWithoutRegistration are 
regressed[1][2], I checked recent commits, and found FLINK-34954 changed 
related logic.

 

[1] 
[http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryo=on=on=off=3=50]

[2] 
http://flink-speed.xyz/timeline/#/?exe=1,6,12=serializerKryoWithoutRegistration=on=on=off=3=50

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-04-22 Thread via GitHub


whhe commented on code in PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#discussion_r1575680958


##
docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md:
##
@@ -28,52 +28,106 @@ under the License.
 
 OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 
OceanBase 进行 SQL 查询。
 
-## 依赖
 
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
+### OceanBase CDC 方案
 
-```xml
-
-   org.apache.flink
-   flink-connector-oceanbase-cdc
-   
-   3.1-SNAPSHOT
-
+名词解释:
 
-```
+- *OceanBase CE*: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL 
https://github.com/oceanbase/oceanbase 。
+- *OceanBase EE*: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 
https://www.oceanbase.com 。
+- *OceanBase Cloud*: OceanBase 云数据库 https://www.oceanbase.com/product/cloud 。
+- *Log Proxy CE*: OceanBase 日志代理服务社区版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
社区版事务日志(commit log)的代理服务 https://github.com/oceanbase/oblogproxy 。
+- *Log Proxy EE*: OceanBase 日志代理服务企业版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
企业版事务日志(commit log)的代理服务,目前仅在 OceanBase Cloud 上提供有限的支持, 详情请咨询相关技术支持。
+- *Binlog Service CE*: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 
复制协议的解决方案,详情参考 Log Proxy CE Binlog 模式的文档。
+- *Binlog Service EE*: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 
MySQL 
复制协议的解决方案,仅可在阿里云使用,详情见[操作指南](https://www.alibabacloud.com/help/zh/apsaradb-for-oceanbase/latest/binlog-overview)。
+- *MySQL Driver*: `mysql-connector-java`,可用于 OceanBase 社区版和 OceanBase 企业版 
MySQL 模式。
+- *OceanBase Driver*: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 
https://github.com/oceanbase/obconnector-j 。
 
-如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。
+OceanBase CDC 源端读取方案:
 
-```xml
-
-   com.oceanbase
-   oceanbase-client
-   2.4.2
-
-```
+
+
+
+
+数据库类型
+支持的驱动
+CDC 连接器
+其他用到的组件
+
+
+
+
+OceanBase CE
+
+MySQL Driver: 5.1.47 (内置), 8.0.x 

Review Comment:
   Sure, thanks for the reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35214) Update result partition id for remote input channel when unknown input channel is updated

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35214:
---
Labels: pull-request-available  (was: )

> Update result partition id for remote input channel when unknown input 
> channel is updated
> -
>
> Key: FLINK-35214
> URL: https://issues.apache.org/jira/browse/FLINK-35214
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> In [FLINK-29768|https://issues.apache.org/jira/browse/FLINK-29768], the 
> result partition in the local input channel has been updated to support 
> speculation. It is necessary to similarly update the result partition ID in 
> the remote input channel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35214][runtime] Update result partition id for remote input channel when unknown input channel is updated [flink]

2024-04-22 Thread via GitHub


TanYuxin-tyx opened a new pull request, #24701:
URL: https://github.com/apache/flink/pull/24701

   
   
   ## What is the purpose of the change
   
   In https://issues.apache.org/jira/browse/FLINK-29768, the result partition 
in the local input channel has been updated to support speculation. It is 
necessary to similarly update the result partition ID in the remote input 
channel.
   
   
   ## Brief change log
   
 - *Update the result partition ID in the remote input channel.*
   
   
   ## Verifying this change
   
   This change is already covered by added tests 
*testUpdateRemoteInputChannelWithNewPartitionId*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34648] add waitChangeResultRequest and WaitChangeResultResponse to avoid RPC timeout. [flink-cdc]

2024-04-22 Thread via GitHub


lvyanquan commented on PR #3128:
URL: https://github.com/apache/flink-cdc/pull/3128#issuecomment-2071458830

   @leonardBang Can you help to start a CI workflow?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu commented on FLINK-35184:
--

Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839947#comment-17839947
 ] 

Shuai Xu edited comment on FLINK-35184 at 4/23/24 5:46 AM:
---

[~rovboyko] , absolutely, please feel free to start the implementation.


was (Author: JIRAUSER300096):
Absolutely, please feel free to start the implementation.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-04-22 Thread via GitHub


yuxiqian commented on code in PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#discussion_r1575668027


##
docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md:
##
@@ -28,52 +28,106 @@ under the License.
 
 OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 
OceanBase 进行 SQL 查询。
 
-## 依赖
 
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
+### OceanBase CDC 方案
 
-```xml
-
-   org.apache.flink
-   flink-connector-oceanbase-cdc
-   
-   3.1-SNAPSHOT
-
+名词解释:
 
-```
+- *OceanBase CE*: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL 
https://github.com/oceanbase/oceanbase 。
+- *OceanBase EE*: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 
https://www.oceanbase.com 。
+- *OceanBase Cloud*: OceanBase 云数据库 https://www.oceanbase.com/product/cloud 。
+- *Log Proxy CE*: OceanBase 日志代理服务社区版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
社区版事务日志(commit log)的代理服务 https://github.com/oceanbase/oblogproxy 。
+- *Log Proxy EE*: OceanBase 日志代理服务企业版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
企业版事务日志(commit log)的代理服务,目前仅在 OceanBase Cloud 上提供有限的支持, 详情请咨询相关技术支持。
+- *Binlog Service CE*: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 
复制协议的解决方案,详情参考 Log Proxy CE Binlog 模式的文档。
+- *Binlog Service EE*: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 
MySQL 
复制协议的解决方案,仅可在阿里云使用,详情见[操作指南](https://www.alibabacloud.com/help/zh/apsaradb-for-oceanbase/latest/binlog-overview)。
+- *MySQL Driver*: `mysql-connector-java`,可用于 OceanBase 社区版和 OceanBase 企业版 
MySQL 模式。
+- *OceanBase Driver*: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 
https://github.com/oceanbase/obconnector-j 。
 
-如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。
+OceanBase CDC 源端读取方案:
 
-```xml
-
-   com.oceanbase
-   oceanbase-client
-   2.4.2
-
-```
+
+
+
+
+数据库类型
+支持的驱动
+CDC 连接器
+其他用到的组件
+
+
+
+
+OceanBase CE
+
+MySQL Driver: 5.1.47 (内置), 8.0.x 

Review Comment:
   Hi @whhe, As requested by ASF, PR #3212 has excluded `mysql-connector-java` 
bundle in released jar package due to incompatible OSS licenses. Could you 
please help fixing this in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-04-22 Thread via GitHub


yuxiqian commented on code in PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#discussion_r1575668027


##
docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md:
##
@@ -28,52 +28,106 @@ under the License.
 
 OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 
OceanBase 进行 SQL 查询。
 
-## 依赖
 
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
+### OceanBase CDC 方案
 
-```xml
-
-   org.apache.flink
-   flink-connector-oceanbase-cdc
-   
-   3.1-SNAPSHOT
-
+名词解释:
 
-```
+- *OceanBase CE*: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL 
https://github.com/oceanbase/oceanbase 。
+- *OceanBase EE*: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 
https://www.oceanbase.com 。
+- *OceanBase Cloud*: OceanBase 云数据库 https://www.oceanbase.com/product/cloud 。
+- *Log Proxy CE*: OceanBase 日志代理服务社区版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
社区版事务日志(commit log)的代理服务 https://github.com/oceanbase/oblogproxy 。
+- *Log Proxy EE*: OceanBase 日志代理服务企业版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
企业版事务日志(commit log)的代理服务,目前仅在 OceanBase Cloud 上提供有限的支持, 详情请咨询相关技术支持。
+- *Binlog Service CE*: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 
复制协议的解决方案,详情参考 Log Proxy CE Binlog 模式的文档。
+- *Binlog Service EE*: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 
MySQL 
复制协议的解决方案,仅可在阿里云使用,详情见[操作指南](https://www.alibabacloud.com/help/zh/apsaradb-for-oceanbase/latest/binlog-overview)。
+- *MySQL Driver*: `mysql-connector-java`,可用于 OceanBase 社区版和 OceanBase 企业版 
MySQL 模式。
+- *OceanBase Driver*: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 
https://github.com/oceanbase/obconnector-j 。
 
-如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。
+OceanBase CDC 源端读取方案:
 
-```xml
-
-   com.oceanbase
-   oceanbase-client
-   2.4.2
-
-```
+
+
+
+
+数据库类型
+支持的驱动
+CDC 连接器
+其他用到的组件
+
+
+
+
+OceanBase CE
+
+MySQL Driver: 5.1.47 (内置), 8.0.x 

Review Comment:
   Hi @whhe, As requested by ASF, PR #3212 has excluded `mysql-connector-java` 
bundle in released jar package due to incompatible OSS licenses. Could you 
please help applying it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34944] Use Incremental Source Framework in Flink CDC OceanBase Source Connector [flink-cdc]

2024-04-22 Thread via GitHub


yuxiqian commented on code in PR #3211:
URL: https://github.com/apache/flink-cdc/pull/3211#discussion_r1575668027


##
docs/content.zh/docs/connectors/legacy-flink-cdc-sources/oceanbase-cdc.md:
##
@@ -28,52 +28,106 @@ under the License.
 
 OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 
OceanBase 进行 SQL 查询。
 
-## 依赖
 
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
-为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 
SQL JAR 包的 SQL 客户端。
+### OceanBase CDC 方案
 
-```xml
-
-   org.apache.flink
-   flink-connector-oceanbase-cdc
-   
-   3.1-SNAPSHOT
-
+名词解释:
 
-```
+- *OceanBase CE*: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL 
https://github.com/oceanbase/oceanbase 。
+- *OceanBase EE*: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 
https://www.oceanbase.com 。
+- *OceanBase Cloud*: OceanBase 云数据库 https://www.oceanbase.com/product/cloud 。
+- *Log Proxy CE*: OceanBase 日志代理服务社区版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
社区版事务日志(commit log)的代理服务 https://github.com/oceanbase/oblogproxy 。
+- *Log Proxy EE*: OceanBase 日志代理服务企业版。单独使用时支持 CDC 模式,是一个获取 OceanBase 
企业版事务日志(commit log)的代理服务,目前仅在 OceanBase Cloud 上提供有限的支持, 详情请咨询相关技术支持。
+- *Binlog Service CE*: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 
复制协议的解决方案,详情参考 Log Proxy CE Binlog 模式的文档。
+- *Binlog Service EE*: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 
MySQL 
复制协议的解决方案,仅可在阿里云使用,详情见[操作指南](https://www.alibabacloud.com/help/zh/apsaradb-for-oceanbase/latest/binlog-overview)。
+- *MySQL Driver*: `mysql-connector-java`,可用于 OceanBase 社区版和 OceanBase 企业版 
MySQL 模式。
+- *OceanBase Driver*: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 
https://github.com/oceanbase/obconnector-j 。
 
-如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。
+OceanBase CDC 源端读取方案:
 
-```xml
-
-   com.oceanbase
-   oceanbase-client
-   2.4.2
-
-```
+
+
+
+
+数据库类型
+支持的驱动
+CDC 连接器
+其他用到的组件
+
+
+
+
+OceanBase CE
+
+MySQL Driver: 5.1.47 (内置), 8.0.x 

Review Comment:
   Hi @whhe, As requested by ASF, PR #3212 has excluded `mysql-connector-java` 
bundle in released jar package due to incompatible OSS licenses. Could you 
please help checking it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator

2024-04-22 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839463#comment-17839463
 ] 

Weijie Guo edited comment on FLINK-35169 at 4/23/24 5:17 AM:
-

master via 68a84fd02fb8e288ff7605073f55834468dcf53a.
1.18 via 1b3849c196c25784dc662398385e0b30f2c23a03.
1.19 via 6045bd8bf935b12c8056e98945e054511da9c251.


was (Author: weijie guo):
master via 68a84fd02fb8e288ff7605073f55834468dcf53a.

> Recycle buffers to freeSegments before releasing data buffer for sort 
> accumulator
> -
>
> Key: FLINK-35169
> URL: https://issues.apache.org/jira/browse/FLINK-35169
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> When using sortBufferAccumulator, we should recycle the buffers to 
> freeSegments before releasing the data buffer. The reason is that when 
> getting buffers from the DataBuffer, it may require more buffers than the 
> current quantity available in freeSegments. Consequently, to ensure adequate 
> buffers from DataBuffer, the flushed and recycled buffers should also be 
> added to freeSegments for reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35169) Recycle buffers to freeSegments before releasing data buffer for sort accumulator

2024-04-22 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo updated FLINK-35169:
---
Fix Version/s: 1.18.2
   1.19.1

> Recycle buffers to freeSegments before releasing data buffer for sort 
> accumulator
> -
>
> Key: FLINK-35169
> URL: https://issues.apache.org/jira/browse/FLINK-35169
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> When using sortBufferAccumulator, we should recycle the buffers to 
> freeSegments before releasing the data buffer. The reason is that when 
> getting buffers from the DataBuffer, it may require more buffers than the 
> current quantity available in freeSegments. Consequently, to ensure adequate 
> buffers from DataBuffer, the flushed and recycled buffers should also be 
> added to freeSegments for reuse.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.19][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-22 Thread via GitHub


reswqa merged PR #24695:
URL: https://github.com/apache/flink/pull/24695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-35169][runtime] Recycle buffers to freeSegments before releasing data buffer for sort accumulator [flink]

2024-04-22 Thread via GitHub


reswqa merged PR #24696:
URL: https://github.com/apache/flink/pull/24696


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35211) when synchronize LOB fields using oracle cdc, an error occurs

2024-04-22 Thread wangsw (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

wangsw updated FLINK-35211:
---
Environment: 
Flink 18.1 

Oracle12c

Oracle cdc 3.0.1

  was:
Flink 18.1 

Oracle cdc 3.0.1


> when synchronize LOB fields using oracle cdc, an error occurs
> -
>
> Key: FLINK-35211
> URL: https://issues.apache.org/jira/browse/FLINK-35211
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
> Environment: Flink 18.1 
> Oracle12c
> Oracle cdc 3.0.1
>Reporter: wangsw
>Priority: Blocker
> Attachments: cdcOracleToPrint.sql
>
>
> {code:java}
> Caused by: java.sql.SQLException: ORA-01291: 缺失日志文件
> ORA-06512: 在 "SYS.DBMS_LOGMNR", line 58
> ORA-06512: 在 line 
> 1oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1205)
> oracle.jdbc.driver.OracleStatement.executeInternal(OracleStatement.java:1823)
> oracle.jdbc.driver.OracleStatement.execute(OracleStatement.java:1778)
> oracle.jdbc.driver.OracleStatementWrapper.execute(OracleStatementWrapper.java:303)
> io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1446)
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.startMiningSession(LogMinerStreamingChangeEventSource.java:677)
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:244)
>  {code}
> when synchronize LOB fields using oracle cdc 3.0.1 , an error occurs. 
> I checked that the debezium version of CDC is 1.9.7.Final. When I used the 
> same version of debezium and the same configuration to synchronize LOB data, 
> the error did not occur, so I am asking for help.
> Please view flink sql in the attachment。
> Without adding parameter 'debezium.lob.enabled' = 'true', the error does not 
> occur.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34970) Translate architecture documents into Chinese

2024-04-22 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839931#comment-17839931
 ] 

Rui Fan commented on FLINK-34970:
-

Merged to main(1.9.0) via: 9ea69eb1c15b9424fb08d8152e79b2b560d53a5c

> Translate architecture documents into Chinese
> -
>
> Key: FLINK-34970
> URL: https://issues.apache.org/jira/browse/FLINK-34970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Affects Versions: 1.9.0
>Reporter: Caican Cai
>Assignee: Caican Cai
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.1
>
>
> Translate architecture documents into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34970) Translate architecture documents into Chinese

2024-04-22 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-34970.
-
Fix Version/s: kubernetes-operator-1.9.0
   (was: 1.9.1)
   Resolution: Fixed

> Translate architecture documents into Chinese
> -
>
> Key: FLINK-34970
> URL: https://issues.apache.org/jira/browse/FLINK-34970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kubernetes Operator
>Affects Versions: 1.9.0
>Reporter: Caican Cai
>Assignee: Caican Cai
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> Translate architecture documents into Chinese



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


1996fanrui merged PR #809:
URL: https://github.com/apache/flink-kubernetes-operator/pull/809


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35214) Update result partition id for remote input channel when unknown input channel is updated

2024-04-22 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan reassigned FLINK-35214:
-

Assignee: Yuxin Tan

> Update result partition id for remote input channel when unknown input 
> channel is updated
> -
>
> Key: FLINK-35214
> URL: https://issues.apache.org/jira/browse/FLINK-35214
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.20.0
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>
> In [FLINK-29768|https://issues.apache.org/jira/browse/FLINK-29768], the 
> result partition in the local input channel has been updated to support 
> speculation. It is necessary to similarly update the result partition ID in 
> the remote input channel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35214) Update result partition id for remote input channel when unknown input channel is updated

2024-04-22 Thread Yuxin Tan (Jira)
Yuxin Tan created FLINK-35214:
-

 Summary: Update result partition id for remote input channel when 
unknown input channel is updated
 Key: FLINK-35214
 URL: https://issues.apache.org/jira/browse/FLINK-35214
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.20.0
Reporter: Yuxin Tan


In [FLINK-29768|https://issues.apache.org/jira/browse/FLINK-29768], the result 
partition in the local input channel has been updated to support speculation. 
It is necessary to similarly update the result partition ID in the remote input 
channel.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Roman Boyko (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839928#comment-17839928
 ] 

Roman Boyko commented on FLINK-35184:
-

[~xu_shuai_] , Ok agree with you. So may I start the implementation?

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575597004


##
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##
@@ -346,6 +337,63 @@ public void testSyncPoint() {
 recordContext2.release();
 }
 
+@Test
+void testBufferTimeout() throws InterruptedException {
+batchSize = 5;
+timeout = 1000;
+setup();
+Runnable userCode = () -> valueState.asyncValue();
+
+//  basic timeout ---
+for (int i = 0; i < batchSize - 1; i++) {
+String record = String.format("key%d-r%d", i, i);
+String key = String.format("key%d", batchSize + i);
+RecordContext recordContext = aec.buildContext(record, 
key);
+aec.setCurrentContext(recordContext);
+userCode.run();
+}
+assertThat(aec.timeoutFlag.get()).isFalse();
+assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+Thread.sleep(timeout + 100);

Review Comment:
   It might be better to avoid using `Thread.sleep` in test cases, as it may 
increase the duration of CI and behave as flaky tests. How about introduce a 
`TestScheduledThreadPoolExecutor` so we can control when each step is 
triggered? An example of this is 
`org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575609479


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Would it be better to reuse existing utility methods like 
`FutureUtils.delay()`? This way AEC won't need to maintain such resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575604054


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;

Review Comment:
   The "O" in "BufferTimeOut" is upper-case while the "o" in timeoutFlag is 
lower-case. It might be better to get them unified to the same convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575603596


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(
+mailboxExecutor,
+stateExecutor,
+DEFAULT_BATCH_SIZE,
+DEFAULT_BUFFER_TIMEOUT,
+DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
 }
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeOut,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeOut = bufferTimeOut;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.timeoutFlag = new AtomicBoolean(false);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeOut > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1,
+new ThreadFactory() {
+@Override
+public Thread newThread(Runnable r) {
+return new Thread(r, "AEC-scheduler");
+}
+});
+this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+// make sure shutdown removes all pending tasks
+
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
 batchSize,
 maxInFlightRecords);
 }
 
+void scheduleTimeout() {
+if (bufferTimeOut > 0) {
+if (currentScheduledFuture != null
+&& !currentScheduledFuture.isDone()
+&& !currentScheduledFuture.isCancelled()) {
+currentScheduledFuture.cancel(false);
+}
+currentScheduledFuture =
+(ScheduledFuture)
+scheduledExecutor.schedule(
+() -> {
+timeoutFlag.set(true);
+mailboxExecutor.execute(
+() -> triggerIfNeeded(false), 
"AEC-timeout");

Review Comment:
   It might be simpler to remove `timeoutFlag` and invoke 
`triggerIfNeeded(true)` directly.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -51,15 +56,24 @@ public class AsyncExecutionController {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
-public static final int DEFAULT_BATCH_SIZE = 1000;
-public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+private static final int DEFAULT_BATCH_SIZE = 1000;
+
+private static final int DEFAULT_BUFFER_TIMEOUT = 1000;

Review Comment:
   How about remove the constants and use 
`ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the `ExecutionConfig`? This can 
help avoid maintaining the same value in two places.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final 

Re: [PR] [FLINK-35196] [Connector / Pulsar] Fix bouncycastle class not found [flink-connector-pulsar]

2024-04-22 Thread via GitHub


wenbingshen commented on PR #91:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/91#issuecomment-2071359293

   > Thanks for your commit. I think `jdk15on` should be dropped by in favor of 
the `jdk18on`.
   
   @syhily Thanks review. I have upgraded `jdk15on` to `jdk18on`. PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35213) A potential inconsistent table structure issue

2024-04-22 Thread LvYanquan (Jira)
LvYanquan created FLINK-35213:
-

 Summary: A potential inconsistent table structure issue
 Key: FLINK-35213
 URL: https://issues.apache.org/jira/browse/FLINK-35213
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.1.0
Reporter: LvYanquan


Currently, DataSinkWriterOperator will [request 
CreateTableEvent|https://github.com/apache/flink-cdc/blob/313726b09690e82aa56fb5b42e89b535d24dadd7/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java#L149]
 from SchemaRegistry when restarted. 
However, If a SchemeChangeEvent is received during this process, SchemaOperator 
will

1. [request 
SchemaRegistry|https://github.com/apache/flink-cdc/blob/313726b09690e82aa56fb5b42e89b535d24dadd7/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java#L252]
 to update the schema, 
2. and then send FlushEvent. 

As the network situation is quite complex, SchemaRegistry may update the schema 
first, and then send a CreateTableEvent with the new schema, which is 
incompatible with DatachangeEvent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35212) PyFlink thread mode process just can run once in standalonesession mode

2024-04-22 Thread Wei Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839926#comment-17839926
 ] 

Wei Yuan commented on FLINK-35212:
--

I also tested this issue with Java8 and Python 3.8.6 and another similar pemja 
error ocurred.

> PyFlink thread mode process just can run once in standalonesession mode
> ---
>
> Key: FLINK-35212
> URL: https://issues.apache.org/jira/browse/FLINK-35212
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
> Environment: Python 3.10.14
> PyFlink==1.18.1
> openjdk version "11.0.21" 2023-10-17 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build 
> 11.0.21+9-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, 
> mixed mode, sharing)
>Reporter: Wei Yuan
>Priority: Critical
>
> {code:java}
> from pyflink.common.types import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> from pyflink.common.time import Instant
> # init task env
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> # config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> # create a batch TableEnvironment
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> # def test_func(row):
> # return row
> # result_ds.map(test_func).print()
> result_ds.print()
> env.execute()
> {code}
> Start a standalone session mode cluster by command: 
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code}
> Submit thread mode job for the first time, this job will success fnished.
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py 
> bug.py {code}
> Use above command to submit job for the second time, an error occured:
> {code:java}
> Job has been submitted with JobID a4f2728199277bba0500796f7925fa26
> Traceback (most recent call last):
>   File "/home/disk1/bug.py", line 34, in 
>     env.execute()
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 773, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
> line 1322, in __call__
>     return_value = get_return_value(
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
> line 146, in deco
>     return f(*a, **kw)
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
> 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: a4f2728199277bba0500796f7925fa26)
>         at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>         at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at 
> 

[jira] [Created] (FLINK-35212) PyFlink thread mode process just can run once in standalonesession mode

2024-04-22 Thread Wei Yuan (Jira)
Wei Yuan created FLINK-35212:


 Summary: PyFlink thread mode process just can run once in 
standalonesession mode
 Key: FLINK-35212
 URL: https://issues.apache.org/jira/browse/FLINK-35212
 Project: Flink
  Issue Type: Bug
  Components: API / Python
 Environment: Python 3.10.14

PyFlink==1.18.1

openjdk version "11.0.21" 2023-10-17 LTS
OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS)
OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, 
mixed mode, sharing)
Reporter: Wei Yuan


{code:java}
from pyflink.common.types import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types, WatermarkStrategy, Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import StreamTableEnvironment, Schema
from pyflink.datastream.functions import ProcessFunction, MapFunction
from pyflink.common.time import Instant


# init task env
config = Configuration()
config.set_string("python.execution-mode", "thread")
# config.set_string("python.execution-mode", "process")
config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
config.set_string("python.executable", "/root/miniconda3/bin/python3")

env = StreamExecutionEnvironment.get_execution_environment(config)
table_env = StreamTableEnvironment.create(env)

# create a batch TableEnvironment
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
"content")
table_env.create_temporary_view("test_table", table)

result_table = table_env.sql_query("select *, NOW() as dt from test_table")
result_ds = table_env.to_data_stream(result_table)

# def test_func(row):
# return row

# result_ds.map(test_func).print()
result_ds.print()

env.execute()
{code}
Start a standalone session mode cluster by command: 
{code:java}
/root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code}
Submit thread mode job for the first time, this job will success fnished.
{code:java}
/root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py bug.py 
{code}
Use above command to submit job for the second time, an error occured:
{code:java}
Job has been submitted with JobID a4f2728199277bba0500796f7925fa26
Traceback (most recent call last):
  File "/home/disk1/bug.py", line 34, in 
    env.execute()
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
 line 773, in execute
    return 
JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
line 1322, in __call__
    return_value = get_return_value(
  File 
"/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
line 146, in deco
    return f(*a, **kw)
  File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
a4f2728199277bba0500796f7925fa26)
        at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
        at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
        at 
org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
        at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: a4f2728199277bba0500796f7925fa26)
        at 

[jira] [Updated] (FLINK-35172) DDL statement is added to the Schema Change Event

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35172:
---
Labels: pull-request-available  (was: )

> DDL statement is added to the Schema Change Event
> -
>
> Key: FLINK-35172
> URL: https://issues.apache.org/jira/browse/FLINK-35172
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
>  Labels: pull-request-available
>
> The current implementation of the [kafka pipeline data sink connector 
> |https://github.com/apache/flink-cdc/pull/2938]does not write ddl statements 
> to the topic because the original dddl statements are missing. ddl cannot be 
> generated backwards using a Schema Change Event.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35172]DDL statement is added to the Schema Change Event [flink-cdc]

2024-04-22 Thread via GitHub


melin opened a new pull request, #3243:
URL: https://github.com/apache/flink-cdc/pull/3243

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575586119


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {
+result.completeExceptionally(e);
+throw new IOException("Error while adding data to ForStDB", e);
+}
+return result;
+}
+
+/** The Put access request for ForStDB. */
+static class Request {
+final K key;
+final V value;
+final ForStInnerTable table;
+
+Request(K key, V value, ForStInnerTable table) {
+this.key = key;
+this.value = value;
+this.table = table;
+}
+
+static  Request of(K key, V value, ForStInnerTable 
table) {

Review Comment:
   I have refined it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575585902


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStWriteBatchOperation.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The writeBatch operation implementation for ForStDB.
+ *
+ * @param  The type of key in put access request.
+ * @param  The type of value in put access request.
+ */
+public class ForStWriteBatchOperation implements ForStDBOperation {
+
+private static final int PER_RECORD_ESTIMATE_BYTES = 100;
+
+private final RocksDB db;
+
+private final List> batchRequest;
+
+private final WriteOptions writeOptions;
+
+ForStWriteBatchOperation(
+RocksDB db, List> batchRequest, WriteOptions 
writeOptions) {
+this.db = db;
+this.batchRequest = batchRequest;
+this.writeOptions = writeOptions;
+}
+
+@Override
+public CompletableFuture process() throws IOException {
+CompletableFuture result = new CompletableFuture<>();
+try (WriteBatch writeBatch =
+new WriteBatch(batchRequest.size() * 
PER_RECORD_ESTIMATE_BYTES)) {
+for (Request request : batchRequest) {
+ForStInnerTable table = request.table;
+if (request.value == null) {
+// put(key, null) == delete(key)
+writeBatch.delete(
+table.getColumnFamilyHandle(), 
table.serializeKey(request.key));
+} else {
+writeBatch.put(
+table.getColumnFamilyHandle(),
+table.serializeKey(request.key),
+table.serializeValue(request.value));
+}
+}
+db.write(writeOptions, writeBatch);
+result.complete(null);
+} catch (RocksDBException e) {
+result.completeExceptionally(e);
+throw new IOException("Error while adding data to ForStDB", e);
+}
+return result;
+}
+
+/** The Put access request for ForStDB. */
+static class Request {

Review Comment:
   They don't have structural  relationship. It's a good idea that using 
`GetRequest` and `PutRequest` to distinguish them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt [flink]

2024-04-22 Thread via GitHub


ljz2051 commented on code in PR #24681:
URL: https://github.com/apache/flink/pull/24681#discussion_r1575583882


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStValueState.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.asyncprocessing.ContextKey;
+import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
+import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
+import org.apache.flink.runtime.state.v2.InternalValueState;
+import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.io.IOException;
+
+/**
+ * The {@link InternalValueState} implement for ForStDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the value.
+ */
+public class ForStValueState extends InternalValueState
+implements ValueState, ForStInnerTable, V> {
+
+/** The column family which this internal value state belongs to. */
+private final ColumnFamilyHandle columnFamilyHandle;
+
+/** The serialized key builder which should be thread-safe. */
+private final ThreadLocal> 
serializedKeyBuilder;
+
+/** The data outputStream used for value serializer, which should be 
thread-safe. */
+private final ThreadLocal valueSerializerView;
+
+/** The data inputStream used for value deserializer, which should be 
thread-safe. */
+private final ThreadLocal valueDeserializerView;
+
+public ForStValueState(
+StateRequestHandler stateRequestHandler,
+ColumnFamilyHandle columnFamily,
+ValueStateDescriptor valueStateDescriptor,
+ThreadLocal> serializedKeyBuilder,

Review Comment:
   I think we could just pass the `Supplier` to `ForStValueState`, and keep the 
`ThreadLocal` semantics  inside ForStState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34727) RestClusterClient.requestJobResult throw ConnectionClosedException when the accumulator data is large

2024-04-22 Thread Wancheng Xiao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839922#comment-17839922
 ] 

Wancheng Xiao commented on FLINK-34727:
---

[~Weijie Guo] Hello, can you help find someone to take a look? We’ve been 
troubled by this issue for some time.

> RestClusterClient.requestJobResult throw ConnectionClosedException when the 
> accumulator data is large
> -
>
> Key: FLINK-34727
> URL: https://issues.apache.org/jira/browse/FLINK-34727
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.16.2, 1.19.0
>Reporter: Wancheng Xiao
>Priority: Critical
>  Labels: pull-request-available
> Attachments: AbstractHandler.png, AbstractRestHandler.png, 
> MiniDispatcher.png, RestServerEndpoint.png, flink_bug_complex.log, 
> flink_bug_simple.log, image-2024-03-19-15-51-20-150.png
>
>
> The task was succeed, but "RestClusterClient.requestJobResult()" encountered 
> an error reporting ConnectionClosedException. (Channel became inactive)
> After debugging, it is speculated that the problem occurred in the flink task 
> server-side "AbstractRestHandler.respondToRequest()" with the 
> "response.thenAccept(resp -> HandlerUtils.sendResponse())", this 
> "thenAccept()" did not pass the future returned by sendResponse, causing the 
> server shutdown process before the request was sent. I suspect that 
> "thenAccept()" needs to be replaced with "thenCompose()"
> The details are as follows:
>  
> *Pseudocode:*
> !image-2024-03-19-15-51-20-150.png|width=802,height=222!
>  
> *Server handling steps:*
> netty-thread: got request
> flink-dispatcher-thread: exec requestJobResult[6] and complete 
> shutDownFuture[8], then call HandlerUtils.sendResponse[13](netty async write)
> netty-thread: write some data to channel.(not done)
> flink-dispatcher-thread: call inFlightRequestTracker.deregisterRequest[15]
> netty-thread: write some data to channel failed, channel not active
> i added some log to trace this bug:
> !AbstractHandler.png|width=406,height=313!
> !AbstractRestHandler.png|width=418,height=322!
> !MiniDispatcher.png|width=419,height=277!
> !RestServerEndpoint.png|width=419,height=279!
> then i got:
> /{*}then call requestJobResult and shutDownFuture.complete; (close channel 
> when request deregisted){*}/
> 2024-03-17 18:01:34.788 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> JobExecutionResultHandler gateway.requestJobStatus complete. 
> [jobStatus=FINISHED]
> /{*}submit sendResponse{*}/
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - submit 
> HandlerUtils.sendResponse().
> /{*}thenAccept(sendResponse()) is complete, will call inFlightRequestTracker, 
> but sendResponse's return future not completed{*}  /
> 2024-03-17 18:01:34.821 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
> requestProcessingFuture complete. 
> [requestProcessingFuture=java.util.concurrent.CompletableFuture@1329aca5[Completed
>  normally]]
> /{*}sendResponse's write task is still running{*}/
> 2024-03-17 18:01:34.822 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.s.netty4.io.netty.handler.stream.ChunkedWriteHandler  - write
> /{*}deregister request and then shut down, then channel close{*}/
> 2024-03-17 18:01:34.826 [flink-akka.actor.default-dispatcher-20] INFO  
> o.a.flink.runtime.rest.handler.job.JobExecutionResultHandler  - call 
> inFlightRequestTracker.deregisterRequest() done
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> o.a.f.shaded.netty4.io.netty.channel.DefaultChannelPipeline  - pipeline close.
> 2024-03-17 18:01:34.827 [flink-rest-server-netty-worker-thread-10] INFO  
> org.apache.flink.runtime.rest.handler.util.HandlerUtils  - lastContentFuture 
> complete. [future=DefaultChannelPromise@621f03ea(failure: 
> java.nio.channels.ClosedChannelException)]
> *more details in flink_bug_complex.log*
>  
>  
>  
> Additionally:
> During the process of investigating this bug, 
> FutureUtils.retryOperationWithDelay swallowed the first occurrence of the 
> "Channel became inactive" exception and, after several retries, the server 
> was shut down,then the client throw "Connection refused" Exception. which had 
> some impact on the troubleshooting process. Could we consider adding some 
> logging here to aid in future diagnostics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35187) FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines

2024-04-22 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839921#comment-17839921
 ] 

Rui Fan commented on FLINK-35187:
-

Hi [~lsy] , I wanna check with you, will this great feature be completed in 
1.20.0 or 2.0.0? What's your plan?

If it's 1.20 , would you mind recording it into the 1.20 release doc[1]? thank 
you in advance.

[1] https://cwiki.apache.org/confluence/display/FLINK/1.20+Release

 

> FLIP-435: Introduce a New Materialized Table for Simplifying Data Pipelines
> ---
>
> Key: FLINK-35187
> URL: https://issues.apache.org/jira/browse/FLINK-35187
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Gateway
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>
> This is an umbrella issue for FLIP-435: Introduce a New Materialized Table 
> for Simplifying Data Pipelines, see FLIP design doc for more detail: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] Using commons-lang3 for exception checking [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


haoxins commented on PR #818:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/818#issuecomment-2071309944

   > Thanks @haoxins for the fix!
   > 
   > The checkstyle[1] is not passed, please run `mvn spotless:apply` and try 
to build on your Local first, thanks~
   > 
   > [1] 
https://github.com/apache/flink-kubernetes-operator/actions/runs/8778998199/job/24132429588?pr=818#step:5:11735
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35184) Hash collision inside MiniBatchStreamingJoin operator

2024-04-22 Thread Shuai Xu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839920#comment-17839920
 ] 

Shuai Xu commented on FLINK-35184:
--

Hi [~rovboyko] , actually it can't be avoid hash collision even if using 
BinaryRowData which can only reduce the probability to some extent. And the 
solution you mentioned works for me.

> Hash collision inside MiniBatchStreamingJoin operator
> -
>
> Key: FLINK-35184
> URL: https://issues.apache.org/jira/browse/FLINK-35184
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Roman Boyko
>Priority: Major
>
> The hash collision is possible for InputSideHasNoUniqueKeyBundle. To 
> reproduce it just launch the following test within 
> StreamingMiniBatchJoinOperatorTest:
>  
> {code:java}
> @Tag("miniBatchSize=6")
> @Test
> public void testInnerJoinWithNoUniqueKeyHashCollision(TestInfo testInfo) 
> throws Exception {
> leftTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id1", "val1"}));
> rightTypeInfo =
> InternalTypeInfo.of(
> RowType.of(
> new LogicalType[] {new IntType(), new 
> BigIntType()},
> new String[] {"id2", "val2"}));
> leftKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> leftTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> rightKeySelector =
> HandwrittenSelectorUtil.getRowDataSelector(
> new int[] {0},
> rightTypeInfo.toRowType().getChildren().toArray(new 
> LogicalType[0]));
> joinKeyTypeInfo = InternalTypeInfo.of(new IntType());
> super.beforeEach(testInfo);
> testHarness.setStateTtlProcessingTime(1);
> testHarness.processElement2(insertRecord(1, 1L));
> testHarness.processElement1(insertRecord(1, 4294967296L));
> testHarness.processElement2(insertRecord(1, 4294967296L));
> testHarness.processElement2(deleteRecord(1, 1L));
> testHarness.close();
> assertor.shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, 
> 4294967296L, 1, 4294967296L));
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35026][runtime][config] Introduce async execution configurations [flink]

2024-04-22 Thread via GitHub


fredia commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575573086


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +108,91 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
+/**
+ * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+ * perform actively when the next state request arrives even if the 
activeQueue has not reached
+ * the {@link #batchSize}.
+ */
+final AtomicBoolean timeoutFlag;
+
+/** The executor service that schedules and calls the triggers of this 
task. */
+final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
    I added a `close()` method, 2 things include:
   1. drain all in-flight records
   2. shutdown `scheduledExecutor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2024-04-22 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan reassigned FLINK-32086:
---

Assignee: Feifan Wang  (was: Zakelly Lan)

> Cleanup non-reported managed directory on exit of TM
> 
>
> Key: FLINK-32086
> URL: https://issues.apache.org/jira/browse/FLINK-32086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Feifan Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-22 Thread via GitHub


Zakelly commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2071298219

   > Hi @Zakelly, the code si LGTM, I have one question, why didn't leverage 
the capability of Reactor for these kind of async callback? It seems very 
similar to me, anything I missed? just curious.
   
   Do you mean the Project Reactor?
   The main reason is that we want to carefully customize and control the 
processing of async requests, to make sure it correctly interacts with Flink's 
components (back-pressure, mailbox, watermark and checkpoint). So what we 
really need is the basic functionality of futures, and we built up the whole 
async processing on top of this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35041) IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed

2024-04-22 Thread Weijie Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weijie Guo reassigned FLINK-35041:
--

Assignee: Feifan Wang

> IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration failed
> --
>
> Key: FLINK-35041
> URL: https://issues.apache.org/jira/browse/FLINK-35041
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Assignee: Feifan Wang
>Priority: Blocker
>
> {code:java}
> Apr 08 03:22:45 03:22:45.450 [ERROR] 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration
>  -- Time elapsed: 0.034 s <<< FAILURE!
> Apr 08 03:22:45 org.opentest4j.AssertionFailedError: 
> Apr 08 03:22:45 
> Apr 08 03:22:45 expected: false
> Apr 08 03:22:45  but was: true
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> Apr 08 03:22:45   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> Apr 08 03:22:45   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(K.java:45)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.DiscardRecordedStateObject.verifyDiscard(DiscardRecordedStateObject.java:34)
> Apr 08 03:22:45   at 
> org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandleTest.testSharedStateReRegistration(IncrementalRemoteKeyedStateHandleTest.java:211)
> Apr 08 03:22:45   at java.lang.reflect.Method.invoke(Method.java:498)
> Apr 08 03:22:45   at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> Apr 08 03:22:45   at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> {code}
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=58782=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=9238]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34970] Translate architecture documents into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on PR #809:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/809#issuecomment-2071289098

   squashed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2024-04-22 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839917#comment-17839917
 ] 

xiaogang zhou commented on FLINK-32086:
---

[~Zakelly] yes no problem

> Cleanup non-reported managed directory on exit of TM
> 
>
> Key: FLINK-32086
> URL: https://issues.apache.org/jira/browse/FLINK-32086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35188][table-api] Introduce CatalogMaterializedTable and related interface to support materialized table [flink]

2024-04-22 Thread via GitHub


flinkbot commented on PR #24700:
URL: https://github.com/apache/flink/pull/24700#issuecomment-2071286954

   
   ## CI report:
   
   * 637d4b173de5779cacbfb1c17f11e8f8fdabc795 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2024-04-22 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839916#comment-17839916
 ] 

Zakelly Lan commented on FLINK-32086:
-

[~Feifan Wang] I'm not working on this. And [~zhoujira86] are u working on 
this? Do you mind if [~Feifan Wang] take this.

> Cleanup non-reported managed directory on exit of TM
> 
>
> Key: FLINK-32086
> URL: https://issues.apache.org/jira/browse/FLINK-32086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35188) Introduce CatalogMaterializedTable and related interface to support materialized table

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35188:
---
Labels: pull-request-available  (was: )

> Introduce CatalogMaterializedTable and related interface to support 
> materialized table
> --
>
> Key: FLINK-35188
> URL: https://issues.apache.org/jira/browse/FLINK-35188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Assignee: dalongliu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35188][table-api] Introduce CatalogMaterializedTable and related interface to support materialized table [flink]

2024-04-22 Thread via GitHub


lsyldliu opened a new pull request, #24700:
URL: https://github.com/apache/flink/pull/24700

   ## What is the purpose of the change
   
   Introduce CatalogMaterializedTable and related interface to support 
materialized table, see https://issues.apache.org/jira/browse/FLINK-35188 for 
more detail.
   
   
   ## Brief change log
   
 - *Introduce CatalogMaterializedTable interface*
 - *Introduce RefreshHandler interface*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - *Added unit test 
CatalogBaseTableResolutionTest#testCatalogMaterializedTableResolution*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35211) when synchronize LOB fields using oracle cdc, an error occurs

2024-04-22 Thread wangsw (Jira)
wangsw created FLINK-35211:
--

 Summary: when synchronize LOB fields using oracle cdc, an error 
occurs
 Key: FLINK-35211
 URL: https://issues.apache.org/jira/browse/FLINK-35211
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
 Environment: Flink 18.1 

Oracle cdc 3.0.1
Reporter: wangsw
 Attachments: cdcOracleToPrint.sql

{code:java}
Caused by: java.sql.SQLException: ORA-01291: 缺失日志文件
ORA-06512: 在 "SYS.DBMS_LOGMNR", line 58
ORA-06512: 在 line 
1oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1205)
oracle.jdbc.driver.OracleStatement.executeInternal(OracleStatement.java:1823)
oracle.jdbc.driver.OracleStatement.execute(OracleStatement.java:1778)
oracle.jdbc.driver.OracleStatementWrapper.execute(OracleStatementWrapper.java:303)
io.debezium.jdbc.JdbcConnection.executeWithoutCommitting(JdbcConnection.java:1446)
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.startMiningSession(LogMinerStreamingChangeEventSource.java:677)
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:244)
 {code}
when synchronize LOB fields using oracle cdc 3.0.1 , an error occurs. 

I checked that the debezium version of CDC is 1.9.7.Final. When I used the same 
version of debezium and the same configuration to synchronize LOB data, the 
error did not occur, so I am asking for help.

Please view flink sql in the attachment。

Without adding parameter 'debezium.lob.enabled' = 'true', the error does not 
occur.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35088) watermark alignment maxAllowedWatermarkDrift and updateInterval param need check

2024-04-22 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839906#comment-17839906
 ] 

Rui Fan commented on FLINK-35088:
-

Thanks [~martijnvisser] for the ping, I will take a look in detail this week.

> watermark alignment maxAllowedWatermarkDrift and updateInterval param need 
> check
> 
>
> Key: FLINK-35088
> URL: https://issues.apache.org/jira/browse/FLINK-35088
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core, Runtime / Coordination
>Affects Versions: 1.16.1
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-11-20-12-29-951.png
>
>
> When I use watermark alignment,
> 1.I found that setting maxAllowedWatermarkDrift to a negative number 
> initially led me to believe it could support delaying the consumption of the 
> source, so I tried it. Then, the upstream data flow would hang indefinitely.
> Root cause:
> {code:java}
> long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()             
>     + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();  {code}
> If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark 
> < lastEmittedWatermark, then the SourceReader will be blocked indefinitely 
> and cannot recover.
> I'm not sure if this is a supported feature of watermark alignment. If it's 
> not, I think an additional parameter validation should be implemented to 
> throw an exception on the client side if the value is negative.
> 2.The updateInterval parameter also lacks validation. If I set it to 0, the 
> task will throw an exception when starting the job manager. The JDK class 
> java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and 
> throws the exception.
> {code:java}
> java.lang.IllegalArgumentException: null
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
>  ~[?:1.8.0_351]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.(SourceCoordinator.java:191)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:59)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.(RecreateOnResetOperatorCoordinator.java:42)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
>  ~[flink-dist_2.12-1.16.1.jar:1.16.1]
>   at 
> 

Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-22 Thread via GitHub


mas-chen commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1575521871


##
docs/layouts/shortcodes/generated/rest_v1_dispatcher.html:
##
@@ -1277,6 +1277,9 @@
   "parallelism" : {
 "type" : "integer"
   },
+  "slotSharingGroupId" : {

Review Comment:
   fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-22 Thread via GitHub


mas-chen commented on PR #24564:
URL: https://github.com/apache/flink/pull/24564#issuecomment-2071218991

   @ruanhang1993 I'm inclined to merge this in the next few days after the 
thorough feedback from Alex. Let me know if you have any concerns


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-3.0][minor][cdc][docs] Fix outdated StarRocks quickstart guide [flink-cdc]

2024-04-22 Thread via GitHub


Jiabao-Sun merged PR #3242:
URL: https://github.com/apache/flink-cdc/pull/3242


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [minor][cdc][docs] Fix outdated StarRocks quickstart guide [flink-cdc]

2024-04-22 Thread via GitHub


Jiabao-Sun merged PR #3238:
URL: https://github.com/apache/flink-cdc/pull/3238


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34444] Initial implementation of JM operator metric rest api [flink]

2024-04-22 Thread via GitHub


mas-chen commented on code in PR #24564:
URL: https://github.com/apache/flink/pull/24564#discussion_r1575497962


##
docs/layouts/shortcodes/generated/rest_v1_dispatcher.html:
##
@@ -1277,6 +1277,9 @@
   "parallelism" : {
 "type" : "integer"
   },
+  "slotSharingGroupId" : {

Review Comment:
   yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839816#comment-17839816
 ] 

Martijn Visser commented on FLINK-35210:


There isn't enough information about used/tested versions of Flink, but I think 
the actual solution should be fixing bugs (if there are any) instead of finding 
workarounds like the one proposed in the ticket. It sounds trivial from the 
start, but the moment you have to take things like new partitions being added 
on a source that's already in use, this becomes less of an easy fix. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
 already showed some considerations for those edge cases as well.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35168][State] Basic State Iterator for async processing [flink]

2024-04-22 Thread via GitHub


jectpro7 commented on PR #24690:
URL: https://github.com/apache/flink/pull/24690#issuecomment-2070594485

   Hi @Zakelly, the code si LGTM, I have one question, why didn't leverage the 
capability of Reactor for these kind of async callback? It seems very similar 
to me, anything I missed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35165) AdaptiveBatch Scheduler should not restrict the default source parallelism to the max parallelism set

2024-04-22 Thread Venkata krishnan Sowrirajan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839806#comment-17839806
 ] 

Venkata krishnan Sowrirajan commented on FLINK-35165:
-

JFYI, I am working on the fix for this issue.

> AdaptiveBatch Scheduler should not restrict the default source parallelism to 
> the max parallelism set
> -
>
> Key: FLINK-35165
> URL: https://issues.apache.org/jira/browse/FLINK-35165
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Venkata krishnan Sowrirajan
>Priority: Major
>
> Copy-pasting the reasoning mentioned on this [discussion 
> thread|https://lists.apache.org/thread/o887xhvvmn2rg5tyymw348yl2mqt23o7].
> Let me state why I think 
> "{_}jobmanager.adaptive-batch-scheduler.default-source-parallelism{_}" should 
> not be bound by the 
> "{_}jobmanager.adaptive-batch-scheduler.max-parallelism{_}".
>  *  Source vertex is unique and does not have any upstream vertices - 
> Downstream vertices read shuffled data partitioned by key, which is not the 
> case for the Source vertex
>  * Limiting source parallelism by downstream vertices' max parallelism is 
> incorrect
>  * If we say for ""semantic consistency" the source vertex parallelism has to 
> be bound by the overall job's max parallelism, it can lead to following 
> issues:
>  ** High filter selectivity with huge amounts of data to read
>  ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so 
> that source parallelism can be set higher can lead to small blocks and 
> sub-optimal performance.
>  ** Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" 
> requires careful tuning of network buffer configurations which is unnecessary 
> in cases where it is not required just so that the source parallelism can be 
> set high.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Nicolas Perrin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839756#comment-17839756
 ] 

Nicolas Perrin commented on FLINK-35210:


That's indeed how I proceeded in our case.

 I'm sorry, my choice of words was a bit confusing.

We are running a Flink job on EMR and we had a lot of issues where some task 
instances would be randomly killed. From what we understood, one of the reason 
was that we used a Flink parallelism for the application equals to a multiple 
of the number of instances. So some of the task instances could end up reading 
one or more partitions while some others would read nothing while having the 
dedicated resources. This behaviour was amplified by the fact that we've got 
several kafka sources.
It appeared to us that this was a source of instability, so we ended up forcing 
the parallelism of the source operator to the number of kafka partitions.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Nicolas Perrin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839756#comment-17839756
 ] 

Nicolas Perrin edited comment on FLINK-35210 at 4/22/24 3:45 PM:
-

That's indeed how I proceeded in our case.

 I'm sorry, my choice of words was a bit confusing.

We run a Flink job on EMR and we had a lot of issues where some task instances 
would be randomly killed. From what we understood, one of the reason was that 
we used a Flink parallelism for the application equals to a multiple of the 
number of instances. So some of the task instances could end up reading one or 
more partitions while some others would read nothing while having the dedicated 
resources. This behaviour was amplified by the fact that we've got several 
kafka sources.
It appeared to us that this was a source of instability, so we ended up forcing 
the parallelism of the source operator to the number of kafka partitions.


was (Author: JIRAUSER305202):
That's indeed how I proceeded in our case.

 I'm sorry, my choice of words was a bit confusing.

We are running a Flink job on EMR and we had a lot of issues where some task 
instances would be randomly killed. From what we understood, one of the reason 
was that we used a Flink parallelism for the application equals to a multiple 
of the number of instances. So some of the task instances could end up reading 
one or more partitions while some others would read nothing while having the 
dedicated resources. This behaviour was amplified by the fact that we've got 
several kafka sources.
It appeared to us that this was a source of instability, so we ended up forcing 
the parallelism of the source operator to the number of kafka partitions.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35138][Connectors/Kafka] Drop support for Flink 1.17 [flink-connector-kafka]

2024-04-22 Thread via GitHub


hlteoh37 commented on code in PR #96:
URL: 
https://github.com/apache/flink-connector-kafka/pull/96#discussion_r1574969556


##
.github/workflows/weekly.yml:
##
@@ -45,11 +41,14 @@ jobs:
   jdk: '8, 11, 17, 21',
   branch: main
 }, {
-  flink: 1.17.2,
-  branch: v3.1
+  flink: 1.18.1,
+  branch: v3.2
+}, {
+  flink: 1.19.0,
+  branch: v3.2,
+  jdk: '8, 11, 17, 21',

Review Comment:
   nit: it would be nice to order the `flink` and `branch` in the list the same 
way. (both increasing down / both decreasing down)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35182] Bump org.apache.commons:commons-compress from 1.24.0 to 1.26.1 for Flink Pulsar connector [flink-connector-pulsar]

2024-04-22 Thread via GitHub


syhily commented on code in PR #90:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/90#discussion_r1574953259


##
pom.xml:
##
@@ -397,6 +397,20 @@ under the License.
 ${commons-compress.version}
 
 
+
+
+org.apache.commons
+commons-lang3
+3.14.0
+
+
+
+
+commons-io
+commons-io
+2.15.1

Review Comment:
   Move the version to the properties.



##
pom.xml:
##
@@ -397,6 +397,20 @@ under the License.
 ${commons-compress.version}
 
 
+
+
+org.apache.commons
+commons-lang3
+3.14.0

Review Comment:
   Move the version to the properties.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35196] [Connector / Pulsar] Fix bouncycastle class not found [flink-connector-pulsar]

2024-04-22 Thread via GitHub


syhily commented on PR #91:
URL: 
https://github.com/apache/flink-connector-pulsar/pull/91#issuecomment-2069874438

   Thanks for your commit. I think `jdk15on` should be dropped by in favor of 
the `jdk18on`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574944960


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 根据指标自动调整任务管理器数量
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务
 - [Helm based installation]({{< ref "docs/operations/helm" >}})
-  - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}})
-  - Advanced customization techniques
-- Up-to-date public 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574944960


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 根据指标自动调整任务管理器数量
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务
 - [Helm based installation]({{< ref "docs/operations/helm" >}})
-  - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}})
-  - Advanced customization techniques
-- Up-to-date public 

[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Oleksandr Nitavskyi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839739#comment-17839739
 ] 

Oleksandr Nitavskyi commented on FLINK-35210:
-

Thanks [~npfp] for suggestion. I believe what you proposed is often resolve 
with some wrapper around KafkaSource, which could be a layer of indirection to 
do a lot of things, e.g. parallelism config.

Meanwhile could you please elaborate how could bad parallelism lead to the Idle 
tasks? Do you mean the case where Source parallelism is lower than the amount 
of partitions and thus you have Source which consumes nothing and thus you have 
no watermark advancement unless 
[Idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources]
 is not configured.

> Give the option to set automatically the parallelism of the KafkaSource to 
> the number of kafka partitions
> -
>
> Key: FLINK-35210
> URL: https://issues.apache.org/jira/browse/FLINK-35210
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Nicolas Perrin
>Priority: Minor
>
> Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
> to be manually chosen which can leads to highly skewed tasks if the developer 
> doesn't do this job.
> To avoid this issue, I propose to:
> -  retrieve dynamically the number of partitions of the topic using 
> `KafkaConsumer.
> partitionsFor(topic).size()`,
> - set the parallelism of the stream built from the source based on this value.
>  This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35178) Checkpoint CLAIM mode does not fully control snapshot ownership

2024-04-22 Thread elon_X (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839522#comment-17839522
 ] 

elon_X edited comment on FLINK-35178 at 4/22/24 3:20 PM:
-

[~lijinzhong] 

Thank you for your response.

I have been using the default value (true) for the 
"state.checkpoints.create-subdir" parameter. However, when I tested by setting 
this value to false, the result was the same, which might indicate I'm doing 
something wrong ?

Additionally, I've encountered another issue. Even though I set 
{{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions 
are not being discarded even if they are not referenced. Only the checkpoint 
specified by the {{-s}} option (chk-x) is discarded.

As shown in the diagram below, I restored from chk-34, but only chk-34 was 
discarded, while chk-32 and chk-33 continue to exist indefinitely.

!image-2024-04-22-15-16-02-381.png!


was (Author: JIRAUSER303028):
[~lijinzhong] 

Thank you for your response.

I have been using the default value (true) for the 
"state.checkpoints.create-subdir" parameter. However, when I tested by setting 
this value to false, the result was the same, which might indicate I'm doing 
something wrong.

Additionally, I've encountered another issue. Even though I set 
{{{}state.checkpoints.num-retained=3{}}}, the older job's checkpoint versions 
are not being discarded even if they are not referenced. Only the checkpoint 
specified by the {{-s}} option (chk-x) is discarded.

As shown in the diagram below, I restored from chk-34, but only chk-34 was 
discarded, while chk-32 and chk-33 continue to exist indefinitely.

!image-2024-04-22-15-16-02-381.png!

> Checkpoint CLAIM mode does not fully control snapshot ownership
> ---
>
> Key: FLINK-35178
> URL: https://issues.apache.org/jira/browse/FLINK-35178
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: elon_X
>Priority: Major
> Attachments: image-2024-04-20-14-51-21-062.png, 
> image-2024-04-22-15-16-02-381.png
>
>
> When I enable incremental checkpointing, and the task fails or is canceled 
> for some reason, restarting the task from {{-s checkpoint_path}} with 
> {{restoreMode CLAIM}} allows the Flink job to recover from the last 
> checkpoint, it just discards the previous checkpoint.
> Then I found that this leads to the following two cases:
> 1. If the new checkpoint_x meta file does not reference files in the shared 
> directory under the previous jobID:         
> the shared and taskowned directories from the previous Job will be left as 
> empty directories, and these two directories will persist without being 
> deleted by Flink. !image-2024-04-20-14-51-21-062.png!
> 2. If the new checkpoint_x meta file references files in the shared directory 
> under the previous jobID:
> the chk-(x-1) from the previous job will be discarded, but there will still 
> be state data in the shared directory under that job, which might persist for 
> a relatively long time. Here arises the question: the previous job is no 
> longer running, and it's unclear whether users should delete the state data. 
> Deleting it could lead to errors when the task is restarted, as the meta 
> might reference files that can no longer be found; this could be confusing 
> for users.
>  
> The potential solution might be to reuse the previous job's jobID when 
> restoring from {{{}-s checkpoint_path{}}}, or to add a new parameter that 
> allows users to specify the jobID they want to recover from;
>  
> Please correct me if there's anything I've misunderstood.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574944490


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 根据指标自动调整任务管理器数量
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务
 - [Helm based installation]({{< ref "docs/operations/helm" >}})
-  - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}})
-  - Advanced customization techniques
-- Up-to-date public 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574944144


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})

Review Comment:
   At first, I thought it was good not to translate. I translated 
Fully-automated, but I didn’t think it was necessarily appropriate.



##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator 的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod 

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574942660


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许你直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574941355


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许您直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator的核心功能包括:
+
+
+
+## 特征
+
+
+
+### 核心
 - Fully-automated [Job Lifecycle Management]({{< ref 
"docs/custom-resource/job-management" >}})
-  - Running, suspending and deleting applications
-  - Stateful and stateless application upgrades
-  - Triggering and managing savepoints
-  - Handling errors, rolling-back broken upgrades
+  - 运行、暂停和删除应用程序
+  - 有状态和无状态应用程序升级
+  - 保存点的触发和管理
+  - 任务管理器的扩展和缩减
 - Multiple Flink version support: v1.15, v1.16, v1.17, v1.18
 - [Deployment Modes]({{< ref 
"docs/custom-resource/overview#application-deployments" >}}):
-  - Application cluster
-  - Session cluster
-  - Session job
+  - 应用程序集群
+  - 会话集群
+  - 会话作业
 - Built-in [High 
Availability](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/)
   
 - Extensible framework
   - [Custom validators]({{< ref 
"docs/operations/plugins#custom-flink-resource-validators" >}})
   - [Custom resource listeners]({{< ref 
"docs/operations/plugins#custom-flink-resource-listeners" >}})  
 - Advanced [Configuration]({{< ref "docs/operations/configuration" >}}) 
management
-  - Default configurations with dynamic updates
-  - Per job configuration
-  - Environment variables
+  - 默认配置与动态更新
+  - 作业配置
+  - 任务管理器配置
 - POD augmentation via [Pod Templates]({{< ref 
"docs/custom-resource/pod-template" >}})
-  - Native Kubernetes POD definitions
-  - Layering (Base/JobManager/TaskManager overrides)
+  - 原生Kubernetes POD定义
+  - 用于自定义容器和资源
 - [Job Autoscaler]({{< ref "docs/custom-resource/autoscaler" >}})
-  - Collect lag and utilization metrics
-  - Scale job vertices to the ideal parallelism
-  - Scale up and down as the load changes
-### Operations
+  - 收集延迟和利用率指标
+  - 根据指标自动调整任务管理器数量
+  - 根据负载的变化进行扩展和缩减
+
+
+
+### 运营
 - Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
-  - Utilizes the well-established [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
-  - Pluggable metrics reporters
-  - Detailed resources and kubernetes api access metrics
+  - 使用成熟的 [Flink Metric 
System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
+  - 可插拔的指标报告器
+  - 详细的资源和 kubernetes api 访问指标
 - Fully-customizable [Logging]({{< ref 
"docs/operations/metrics-logging#logging" >}})
-  - Default log configuration
-  - Per job log configuration
-  - Sidecar based log forwarders
-- Flink Web UI and REST Endpoint Access
-  - Fully supported Flink Native Kubernetes [service expose 
types](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
-  - Dynamic [Ingress templates]({{< ref "docs/operations/ingress" >}})
+  - 默认日志配置
+  - 每个作业日志配置
+  - 基于 sidecar 的日志转发器
+- Flink Web UI 和 REST 端点访问
+  - 完整支持 Flink 原生 Kubernetes 
[服务暴露类型](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#accessing-flinks-web-ui)
+  - 通过 [Ingress 模板]({{< ref "docs/operations/ingress" >}}) 动态暴露服务
 - [Helm based installation]({{< ref "docs/operations/helm" >}})
-  - Automated [RBAC configuration]({{< ref "docs/operations/rbac" >}})
-  - Advanced customization techniques
-- Up-to-date public repositories

Re: [PR] [FLINK-34980] Translate overview document into Chinese [flink-kubernetes-operator]

2024-04-22 Thread via GitHub


caicancai commented on code in PR #810:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/810#discussion_r1574940019


##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许您直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。

Review Comment:
   done



##
docs/content.zh/docs/concepts/overview.md:
##
@@ -24,86 +24,93 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Overview
-Flink Kubernetes Operator acts as a control plane to manage the complete 
deployment lifecycle of Apache Flink applications. Although Flink’s native 
Kubernetes integration already allows you to directly deploy Flink applications 
on a running Kubernetes(k8s) cluster, [custom 
resources](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 and the [operator 
pattern](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) have 
also become central to a Kubernetes native deployment experience.
+
 
-Flink Kubernetes Operator aims to capture the responsibilities of a human 
operator who is managing Flink deployments. Human operators have deep knowledge 
of how Flink deployments ought to behave, how to start clusters, how to deploy 
jobs, how to upgrade them and how to react if there are problems. The main goal 
of the operator is the automation of these activities, which cannot be achieved 
through the Flink native integration alone.
+# 概述
+Flink Kubernetes Operator 扮演控制平面的角色,用于管理 Apache Flink 应用程序的完整部署生命周期。尽管 Flink 
的原生 Kubernetes 集成已经允许您直接在运行的 Kubernetes(k8s) 集群上部署 Flink 应用程序,但 
[自定义资源](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/)
 和 
[operator模式](https://kubernetes.io/docs/concepts/extend-kubernetes/operator/) 
也已成为 Kubernetes 本地部署体验的核心。
 
-## Features
-### Core
+Flink Kubernetes Operator 通过自定义资源定义(CRD)来扩展 Kubernetes API,以便通过本地 k8s 工具(如 
kubectl)管理和操作 Flink 部署。Operator的核心功能包括:

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]

2024-04-22 Thread via GitHub


flinkbot commented on PR #24699:
URL: https://github.com/apache/flink/pull/24699#issuecomment-2069755053

   
   ## CI report:
   
   * 0fee492a076714f17aa8548b94cb55c4f52fc446 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-22 Thread via GitHub


flinkbot commented on PR #24698:
URL: https://github.com/apache/flink/pull/24698#issuecomment-2069754200

   
   ## CI report:
   
   * 2b98fd70bb820730897b52e71931d182ebe2d638 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions

2024-04-22 Thread Nicolas Perrin (Jira)
Nicolas Perrin created FLINK-35210:
--

 Summary: Give the option to set automatically the parallelism of 
the KafkaSource to the number of kafka partitions
 Key: FLINK-35210
 URL: https://issues.apache.org/jira/browse/FLINK-35210
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Nicolas Perrin


Currently the setting of the `KafkaSource` Flink's operator parallelism needs 
to be manually chosen which can leads to highly skewed tasks if the developer 
doesn't do this job.

To avoid this issue, I propose to:
-  retrieve dynamically the number of partitions of the topic using 
`KafkaConsumer.
partitionsFor(topic).size()`,
- set the parallelism of the stream built from the source based on this value.

 This way there won't be any idle tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33722) MATCH_RECOGNIZE in batch mode ignores events order

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33722:
---
Labels: pull-request-available  (was: )

> MATCH_RECOGNIZE in batch mode ignores events order
> --
>
> Key: FLINK-33722
> URL: https://issues.apache.org/jira/browse/FLINK-33722
> Project: Flink
>  Issue Type: Bug
>  Components: Library / CEP
>Affects Versions: 1.17.1
>Reporter: Grzegorz Kołakowski
>Priority: Major
>  Labels: pull-request-available
>
> MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
> the following example:
> {code:sql}
> FROM events
> MATCH_RECOGNIZE (
> PARTITION BY user_id
> ORDER BY ts ASC
> MEASURES
> FIRST(A.ts) as _start,
> LAST(A.ts) as _middle,
> LAST(B.ts) as _finish
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
> DEFINE
> A AS active is false,
> B AS active is true
> ) AS T {code}
> where _events_ is a Postgresql table containing ~1 records.
> {code:java}
> CREATE TABLE events (
>   id INT,
>   user_id INT,
>   ts TIMESTAMP(3),
>   active BOOLEAN,
>   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:postgresql://postgres:5432/test',
> 'username' = 'test',
> 'password' = 'test',
> 'table-name' = 'events'
> ); {code}
> It can happen that _finish is smaller than _start or _middle, which is wrong.
> {noformat}
>user_id  _start _middle 
> _finish
>  1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
> 14:34:44.264{noformat}
>  
> Repository where I reproduced the problem: 
> [https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging]
> 
>  
> According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
> process records in processing time:
> [https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54]
> A comparator is passed along to the operator covering the sorting on ts 
> field: 
> [https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173]
>  but this is only secondary sorting. It is applied only within records of the 
> same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode [flink]

2024-04-22 Thread via GitHub


grzegorz8 opened a new pull request, #24699:
URL: https://github.com/apache/flink/pull/24699

   
   
   ## What is the purpose of the change
   
   Currently, MATCH_RECOGNIZE completely ignores the ORDER BY clause in batch 
mode: the events are processed in the order they come which makes no sense. 
This pull request adds proper events ordering in Pattern Recognition for batch.
   
   ## Brief change log
   
   - Fix events ordering in MATCH_RECOGNIZE in batch mode
   
   ## Verifying this change
   
   This change added tests and can be verified with 
`org.apache.flink.table.planner.runtime.batch.sql.MatchRecognizeITCase`. 
Existing tests have been modified.
   A few new tests have been added - they are simiar to the ones in 
`org.apache.flink.table.planner.runtime.stream.sql.MatchRecognizeITCase`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35158) Error handling in StateFuture's callback

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35158:
---
Labels: pull-request-available  (was: )

> Error handling in StateFuture's callback
> 
>
> Key: FLINK-35158
> URL: https://issues.apache.org/jira/browse/FLINK-35158
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-22 Thread via GitHub


fredia opened a new pull request, #24698:
URL: https://github.com/apache/flink/pull/24698

   
   
   ## What is the purpose of the change
   
   This PR implements the error handling in StateFuture's callback, making the 
job fail when any exception exception occurs in StateFuture's callback.
   
   
   ## Brief change log
   
   - Add exception handling to the implementation class of `StateFuture`/
   - Wire the exception in `StateFuture`'s callback to 
`environment.failExternally`.
   
   
   ## Verifying this change
   
   
   This change added/updated tests and can be verified as follows:
   - `AsyncExecutionControllerTest#testException`
   - `ContextStateFutureImplTest`
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Adding Kafka Connector v3.2.0 [flink-web]

2024-04-22 Thread via GitHub


dannycranmer opened a new pull request, #738:
URL: https://github.com/apache/flink-web/pull/738

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35027) Implement checkpoint drain in AsyncExecutionController

2024-04-22 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839662#comment-17839662
 ] 

Yanfei Lei commented on FLINK-35027:


Merged into master via 9336760

> Implement checkpoint drain in AsyncExecutionController
> --
>
> Key: FLINK-35027
> URL: https://issues.apache.org/jira/browse/FLINK-35027
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35028) Timer firing under async execution model

2024-04-22 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839659#comment-17839659
 ] 

Yanfei Lei commented on FLINK-35028:


Merged into master via 0b2e988

> Timer firing under async execution model
> 
>
> Key: FLINK-35028
> URL: https://issues.apache.org/jira/browse/FLINK-35028
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-04-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35138:
--
Fix Version/s: kafka-3.2.0

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-22 Thread via GitHub


fredia commented on PR #24672:
URL: https://github.com/apache/flink/pull/24672#issuecomment-2069427265

   Thanks all for the detailed review, merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35138][Connectors/Kafka] Drop support for Flink 1.17 [flink-connector-kafka]

2024-04-22 Thread via GitHub


boring-cyborg[bot] commented on PR #96:
URL: 
https://github.com/apache/flink-connector-kafka/pull/96#issuecomment-2069423693

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-35028) Timer firing under async execution model

2024-04-22 Thread Yanfei Lei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanfei Lei resolved FLINK-35028.

Resolution: Resolved

> Timer firing under async execution model
> 
>
> Key: FLINK-35028
> URL: https://issues.apache.org/jira/browse/FLINK-35028
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends, Runtime / Task
>Reporter: Yanfei Lei
>Assignee: Yanfei Lei
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35028][runtime] Timer firing under async execution model [flink]

2024-04-22 Thread via GitHub


fredia merged PR #24672:
URL: https://github.com/apache/flink/pull/24672


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-04-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35138:
---
Labels: pull-request-available  (was: )

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35133][Connectors/Cassandra] Adding support for Flink 1.19 [flink-connector-cassandra]

2024-04-22 Thread via GitHub


echauchot commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/28#discussion_r1574738000


##
.github/workflows/weekly.yml:
##
@@ -22,36 +22,24 @@ on:
 - cron: "0 0 * * 0"
   workflow_dispatch:
 jobs:
-  # tests that current connector iteration does not break compatibility with 
last 2 minor released Flink versions
-  non-main-version:
+  compile_and_test:
 if: github.repository_owner == 'apache'
+strategy:
+  matrix:
+flink_branches: [{

Review Comment:
   perfect :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35133][Connectors/Cassandra] Adding support for Flink 1.19 [flink-connector-cassandra]

2024-04-22 Thread via GitHub


dannycranmer commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/28#discussion_r1574736573


##
.github/workflows/weekly.yml:
##
@@ -22,36 +22,24 @@ on:
 - cron: "0 0 * * 0"
   workflow_dispatch:
 jobs:
-  # tests that current connector iteration does not break compatibility with 
last 2 minor released Flink versions
-  non-main-version:
+  compile_and_test:
 if: github.repository_owner == 'apache'
+strategy:
+  matrix:
+flink_branches: [{

Review Comment:
   Yeah that makes sense. I need to update to include the new v3.2 branch, so 
will add them then. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-04-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35138:
-

Assignee: Danny Cranmer

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-04-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-35138:
--
Summary: Release flink-connector-kafka v3.2.0 for Flink 1.19  (was: Release 
flink-connector-kafka vX.X.X for Flink 1.19)

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Priority: Major
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35133) Release flink-connector-cassandra v3.2.0 for Flink 1.19

2024-04-22 Thread Danny Cranmer (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839651#comment-17839651
 ] 

Danny Cranmer commented on FLINK-35133:
---

3.2.0-RC1 Vote thread: 
https://lists.apache.org/thread/28x1n2p5cxl66t4w6vrq06o9c1j050fj

> Release flink-connector-cassandra v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35133
> URL: https://issues.apache.org/jira/browse/FLINK-35133
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Cassandra
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: cassandra-3.2.0
>
>
> https://github.com/apache/flink-connector-cassandra



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Adding Cassandra Connector v3.2.0 [flink-web]

2024-04-22 Thread via GitHub


dannycranmer opened a new pull request, #737:
URL: https://github.com/apache/flink-web/pull/737

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35092][cdc][starrocks] Add starrocks integration test cases [flink-cdc]

2024-04-22 Thread via GitHub


banmoy commented on PR #3231:
URL: https://github.com/apache/flink-cdc/pull/3231#issuecomment-2069316077

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-35209) Add a DeserializationSchema decorator that counts deserialize errors

2024-04-22 Thread Nicolas Perrin (Jira)
Nicolas Perrin created FLINK-35209:
--

 Summary: Add a DeserializationSchema decorator that counts 
deserialize errors
 Key: FLINK-35209
 URL: https://issues.apache.org/jira/browse/FLINK-35209
 Project: Flink
  Issue Type: New Feature
  Components: API / Core
Reporter: Nicolas Perrin


I would like to propose a PR that implements a decorator for 
`DeserializationSchema`.

The decorator decorates an `DeserializationSchema` object. The purpose of this 
decorator is to catch any deserialization errors that could occur when 
deserializing messages. The decorator has a flag to decide to fail or not in 
this case. If it makes the error silent, then it would count them in a 
`flink.metrics.Counter` so the user can monitor the silent errors. This PR is 
ready to be created.

This decorator could be improved by having a sink that would be used to sink 
all the messages causing deserialization errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35133][Connectors/Cassandra] Adding support for Flink 1.19 [flink-connector-cassandra]

2024-04-22 Thread via GitHub


echauchot commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/28#discussion_r1574680771


##
.github/workflows/weekly.yml:
##
@@ -22,36 +22,24 @@ on:
 - cron: "0 0 * * 0"
   workflow_dispatch:
 jobs:
-  # tests that current connector iteration does not break compatibility with 
last 2 minor released Flink versions
-  non-main-version:
+  compile_and_test:
 if: github.repository_owner == 'apache'
+strategy:
+  matrix:
+flink_branches: [{

Review Comment:
   It is true that all the PRs are tested against last 2 Flink versions but 
don't you think that the weekly workflow should also test against these 
versions and not only the snapshots ?



##
.github/workflows/weekly.yml:
##
@@ -22,36 +22,24 @@ on:
 - cron: "0 0 * * 0"
   workflow_dispatch:
 jobs:
-  # tests that current connector iteration does not break compatibility with 
last 2 minor released Flink versions
-  non-main-version:
+  compile_and_test:
 if: github.repository_owner == 'apache'
+strategy:
+  matrix:
+flink_branches: [{
+  flink: 1.18-SNAPSHOT,
+  branch: main
+}, {
+  flink: 1.19-SNAPSHOT,
+  branch: main
+}, {
+  flink: 1.20-SNAPSHOT,
+  branch: main
+}]
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
-  flink_version: 1.17.2
-  connector_branch: main
-  skip_archunit_tests: true

Review Comment:
   Cool that we no more need to test against 1.17.2 and skip the archunit tests 
!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34227) Job doesn't disconnect from ResourceManager

2024-04-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839630#comment-17839630
 ] 

Ryan Skraba commented on FLINK-34227:
-

1.18 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/8769422951/job/24065034854#step:10:14503
1.20 AdaptiveScheduler / Test (module: table) 
https://github.com/apache/flink/actions/runs/8777471561/job/24082689462#step:10:13087

> Job doesn't disconnect from ResourceManager
> ---
>
> Key: FLINK-34227
> URL: https://issues.apache.org/jira/browse/FLINK-34227
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Matthias Pohl
>Assignee: Matthias Pohl
>Priority: Critical
>  Labels: github-actions, pull-request-available, test-stability
> Attachments: FLINK-34227.7e7d69daebb438b8d03b7392c9c55115.log, 
> FLINK-34227.log
>
>
> https://github.com/XComp/flink/actions/runs/7634987973/job/20800205972#step:10:14557
> {code}
> [...]
> "main" #1 prio=5 os_prio=0 tid=0x7f4b7000 nid=0x24ec0 waiting on 
> condition [0x7fccce1eb000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xbdd52618> (a 
> java.util.concurrent.CompletableFuture$Signaller)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>   at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>   at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2131)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2099)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2077)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:876)
>   at 
> org.apache.flink.table.planner.runtime.stream.sql.WindowDistinctAggregateITCase.testHopWindow_Cube(WindowDistinctAggregateITCase.scala:550)
> [...]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [minor][docs] Fix outdated StarRocks quickstart guide [flink-cdc]

2024-04-22 Thread via GitHub


yuxiqian commented on PR #3238:
URL: https://github.com/apache/flink-cdc/pull/3238#issuecomment-2069248748

   @Jiabao-Sun Sure! Done in #3242.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35175) HadoopDataInputStream can't compile with Hadoop 3.2.3

2024-04-22 Thread Ryan Skraba (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839628#comment-17839628
 ] 

Ryan Skraba commented on FLINK-35175:
-

Not including the fix: 
1.20 Hadoop 3.1.3 / Compile 
https://github.com/apache/flink/actions/runs/8747381080/job/24005737445#step:6:1560
1.20 Hadoop 3.1.3 / Compile 
https://github.com/apache/flink/actions/runs/8769422914/job/24064887346#step:6:1759


> HadoopDataInputStream can't compile with Hadoop 3.2.3
> -
>
> Key: FLINK-35175
> URL: https://issues.apache.org/jira/browse/FLINK-35175
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.20.0
>Reporter: Ryan Skraba
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>
> Unfortunately, introduced in FLINK-35045: 
> [PREADWRITEBUFFER|https://github.com/apache/flink/commit/a312a3bdd258e0ff7d6f94e979b32e2bc762b82f#diff-3ed57be01895ba0f792110e40f4283427c55528f11a5105b4bf34ebd4e6fef0dR182]
>  was added in Hadoop releases 
> [3.3.0|https://github.com/apache/hadoop/blob/rel/release-3.3.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72]
>  and 
> [2.10.0|https://github.com/apache/hadoop/blob/rel/release-2.10.0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java#L72].
> It doesn't exist in flink.hadoop.version 
> [3.2.3|https://github.com/apache/hadoop/blob/rel/release-3.2.3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java],
>  which we are using in end-to-end tests.
> {code:java}
> 00:23:55.093 [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile 
> (default-compile) on project flink-hadoop-fs: Compilation failure: 
> Compilation failure: 
> 00:23:55.093 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[151,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable READBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[182,63]
>  cannot find symbol
> 00:23:55.094 [ERROR]   symbol:   variable PREADBYTEBUFFER
> 00:23:55.094 [ERROR]   location: interface 
> org.apache.hadoop.fs.StreamCapabilities
> 00:23:55.094 [ERROR] 
> /home/vsts/work/1/s/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java:[183,43]
>  incompatible types: long cannot be converted to 
> org.apache.hadoop.io.ByteBufferPool
> 00:23:55.094 [ERROR] -> [Help 1] {code}
> * 1.20 compile_cron_hadoop313 
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=59012=logs=87489130-75dc-54e4-1f45-80c30aa367a3=73da6d75-f30d-5d5a-acbe-487a9dcff678=3630



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >