[jira] [Closed] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun closed FLINK-35274.
--

> Occasional failure issue with Flink CDC Db2 UT
> --
>
> Key: FLINK-35274
> URL: https://issues.apache.org/jira/browse/FLINK-35274
> Project: Flink
>  Issue Type: Bug
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
> tableId don't have database name, it will cause table schame occasional not 
> found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-35274.

Resolution: Fixed

Fixed via cdc
* master: a7cb46f7621568486a069a7ae01a7b86ebb0a801
* release-3.1: d556f29475a52234a98bcc65db959483a10beb52

> Occasional failure issue with Flink CDC Db2 UT
> --
>
> Key: FLINK-35274
> URL: https://issues.apache.org/jira/browse/FLINK-35274
> Project: Flink
>  Issue Type: Bug
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
> tableId don't have database name, it will cause table schame occasional not 
> found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]

2024-05-05 Thread via GitHub


yuxiqian commented on PR #2968:
URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095245702

   Hi @AidenPerce, the Db2 CI fix has been merged into `master` branch, could 
you please rebase this PR and see if this problem persists? Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun reassigned FLINK-35274:
--

Assignee: Xin Gong

> Occasional failure issue with Flink CDC Db2 UT
> --
>
> Key: FLINK-35274
> URL: https://issues.apache.org/jira/browse/FLINK-35274
> Project: Flink
>  Issue Type: Bug
>Reporter: Xin Gong
>Assignee: Xin Gong
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 3.1.0
>
>
> Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data 
> tableId don't have database name, it will cause table schame occasional not 
> found when task exception restart. I will fix it by supplement database name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]

2024-05-05 Thread via GitHub


Jiabao-Sun merged PR #3283:
URL: https://github.com/apache/flink-cdc/pull/3283


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]

2024-05-05 Thread via GitHub


Jiabao-Sun merged PR #3284:
URL: https://github.com/apache/flink-cdc/pull/3284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc [flink-cdc]

2024-05-05 Thread via GitHub


xieyi888 commented on PR #3266:
URL: https://github.com/apache/flink-cdc/pull/3266#issuecomment-2095236881

   @yuxiqian @czy006  I had made changes, Please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]

2024-05-05 Thread via GitHub


yuxiqian commented on PR #2968:
URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095235515

   Hi @AidenPerce, sorry about the inconvenience. I think it's related to a 
glitch in Db2 incremental connector and should be fixed by #3283. Will try to 
get it merged asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]

2024-05-05 Thread via GitHub


fredia commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1590503035


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -104,15 +105,21 @@ public class AsyncExecutionController implements 
StateRequestHandler {
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
-StateExecutor stateExecutor,
+AsyncKeyedStateBackend asyncKeyedStateBackend,
 int maxParallelism,
 int batchSize,
 long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
 this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
-this.stateExecutor = stateExecutor;
+if (asyncKeyedStateBackend != null) {

Review Comment:
   Would it be better to put the initialization of the `state executor` in the 
`StreamOperator`?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java:
##
@@ -242,6 +248,25 @@ private KeyedStateStore 
checkPreconditionsAndGetKeyedStateStore(
 return keyedStateStore;
 }
 
+// TODO: Reconstruct this after StateManager is ready in FLIP-410.

Review Comment:
   How about renaming it to `getValueState()`?
   BTW, when the user provides the stateDescriptorV2, this method is used, and 
when the user provides the stateDescriptorV1, the old `getState` is used, 
right?  Can stateDescriptorV2 and stateDescriptorV1 coexist?



##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java:
##
@@ -54,7 +59,14 @@ public class ForStKeyedStateBackendBuilder
 
 protected final Logger logger = LoggerFactory.getLogger(getClass());
 
+private static final int KEY_SERIALIZER_BUFFER_START_SIZE = 32;
+
+private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128;

Review Comment:
   Why is 128 chosen here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-35232) Support for retry settings on GCS connector

2024-05-05 Thread Xintong Song (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xintong Song reassigned FLINK-35232:


Assignee: Oleksandr Nitavskyi  (was: Ravi Singh)

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-05-05 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843615#comment-17843615
 ] 

Xintong Song commented on FLINK-35232:
--

Thanks for the information. I'm assigning this to [~Oleksandr Nitavskyi] who 
provided the PR.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-05-05 Thread Vikas M (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843612#comment-17843612
 ] 

Vikas M commented on FLINK-35232:
-

> BTW, the ticket is currently assigned to [~singhravidutt] , who is neither 
> the reporter of the ticket nor the author of the PR. Anyone knows the context?

Regarding above, it occurred as I cloned 
https://issues.apache.org/jira/browse/FLINK-32877 to create this ticket and it 
did not let me change the assignee of this ticket once it got created. Please 
feel free to change the assignee as appropriate, thanks.

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]

2024-05-05 Thread via GitHub


AidenPerce commented on PR #2968:
URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095121064

   > Hi @AidenPerce, could you please rebase this PR with latest `master` 
branch before it could be merged? Renaming like `com.ververica.cdc` to 
`org.apache.flink.cdc` might be necessary.
   
   I have rebase the branch of this pr, but there are some problems in 
test-case of  db2-connector, that caused i can't 
   update this pr.  I don't know why the case couldn't pass, this is the check 
log:  
https://github.com/AidenPerce/flink-cdc-connectors/actions/runs/8963238039/job/24613239338


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]

2024-05-05 Thread via GitHub


masteryhx commented on code in PR #24740:
URL: https://github.com/apache/flink/pull/24740#discussion_r1590495309


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -74,15 +76,20 @@ public void setup(
 final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
 final long asyncBufferTimeout =
 environment.getExecutionConfig().getAsyncStateBufferTimeout();
-// TODO: initial state executor and set state executor for aec
+
+AsyncKeyedStateBackend asyncKeyedStateBackend =
+Preconditions.checkNotNull(
+stateHandler.getAsyncKeyedStateBackend(),
+"Current State Backend doesn't support async access");
 this.asyncExecutionController =
 new AsyncExecutionController(
 mailboxExecutor,
-null,
+asyncKeyedStateBackend.createStateExecutor(),
 maxParallelism,
 asyncBufferSize,
 asyncBufferTimeout,
 inFlightRecordsLimit);
+asyncKeyedStateBackend.setup(asyncExecutionController);

Review Comment:
   Thanks for the suggestion.
   I think it's reasonable to maintain `setup` in AEC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector

2024-05-05 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843606#comment-17843606
 ] 

Xintong Song commented on FLINK-35232:
--

I think we can just move forward with this Jira ticket.

I noticed there's already a PR opened. I can help review and merge it. And it 
would be helpful if [~galenwarren] can also take a look at it. Not only 
committers but everyone can help review PRs. Committers are only needed for the 
final merge.

BTW, the ticket is currently assigned to [~singhravidutt] , who is neither the 
reporter of the ticket nor the author of the PR. Anyone knows the context?

> Support for retry settings on GCS connector
> ---
>
> Key: FLINK-35232
> URL: https://issues.apache.org/jira/browse/FLINK-35232
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / FileSystem
>Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1
>Reporter: Vikas M
>Assignee: Ravi Singh
>Priority: Major
>  Labels: pull-request-available
>
> https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to 
> specify transport options in GCS connector. While setting the params enabled 
> here reduced read timeouts, we still see 503 errors leading to Flink job 
> restarts.
> Thus, in this ticket, we want to specify additional retry settings as noted 
> in 
> [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries]
> We need 
> [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods]
>  methods available for Flink users so that they can customize their 
> deployment. In particular next settings seems to be the minimum required to 
> adjust GCS timeout with Job's checkpoint config:
>  * 
> [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__]
>  * 
> [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__]
>  * 
> [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__]
>  * 
> [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__]
>  * 
> [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__]
>  
> Basically the proposal is to be able to tune the timeout via multiplier, 
> maxAttemts + totalTimeout mechanisms.
> All of the config options should be optional and the default one should be 
> used in case some of configs are not provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc [flink-cdc]

2024-05-05 Thread via GitHub


xieyi888 commented on code in PR #3266:
URL: https://github.com/apache/flink-cdc/pull/3266#discussion_r1590486774


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tidb.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Unit test for {@link TiDBSourceMetrics}. */
+public class TiDBSourceMetricsTest {
+private static final String FETCH_EVENTTIME_LAG = 
"currentFetchEventTimeLag";

Review Comment:
   > Maybe can use MetricNames.CURRENT_FETCH_EVENT_TIME_LAG Replace it?
   
   Thanks @czy006 , I had use MetricNames.CURRENT_EMIT_EVENT_TIME_LAG,
   MetricNames.CURRENT_FETCH_EVENT_TIME_LAG,
   MetricNames.SOURCE_IDLE_TIME
   instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]

2024-05-05 Thread via GitHub


AidenPerce closed pull request #2968: [fix] repair a snapshot-split bug:
URL: https://github.com/apache/flink-cdc/pull/2968


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [tidb] Add metrics for tidb connector [flink-cdc]

2024-05-05 Thread via GitHub


xieyi888 closed pull request #1974: [tidb] Add metrics for tidb connector
URL: https://github.com/apache/flink-cdc/pull/1974


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun closed FLINK-35244.
--

>  Correct the package for flink-connector-tidb-cdc test
> --
>
> Key: FLINK-35244
> URL: https://issues.apache.org/jira/browse/FLINK-35244
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xie Yi
>Assignee: Xie Yi
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-04-26-16-19-39-297.png
>
>
> test case for flink-connector-tidb-cdc should under
> *org.apache.flink.cdc.connectors.tidb* package
> instead of *org.apache.flink.cdc.connectors*
> !image-2024-04-26-16-19-39-297.png!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-35244.

Fix Version/s: cdc-3.2.0
   Resolution: Fixed

Resolved via cdc-master: 002b16ed4e155b01374040ff302b7536d9c41245

>  Correct the package for flink-connector-tidb-cdc test
> --
>
> Key: FLINK-35244
> URL: https://issues.apache.org/jira/browse/FLINK-35244
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xie Yi
>Assignee: Xie Yi
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
> Attachments: image-2024-04-26-16-19-39-297.png
>
>
> test case for flink-connector-tidb-cdc should under
> *org.apache.flink.cdc.connectors.tidb* package
> instead of *org.apache.flink.cdc.connectors*
> !image-2024-04-26-16-19-39-297.png!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-35244:
---
Summary:  Correct the package for flink-connector-tidb-cdc test  (was: Move 
package for flink-connector-tidb-cdc test)

>  Correct the package for flink-connector-tidb-cdc test
> --
>
> Key: FLINK-35244
> URL: https://issues.apache.org/jira/browse/FLINK-35244
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Xie Yi
>Assignee: Xie Yi
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-04-26-16-19-39-297.png
>
>
> test case for flink-connector-tidb-cdc should under
> *org.apache.flink.cdc.connectors.tidb* package
> instead of *org.apache.flink.cdc.connectors*
> !image-2024-04-26-16-19-39-297.png!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35244][cdc-connector][tidb] Move package for flink-connector-tidb-cdc test [flink-cdc]

2024-05-05 Thread via GitHub


Jiabao-Sun merged PR #3265:
URL: https://github.com/apache/flink-cdc/pull/3265


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-34517) environment configs ignored when calling procedure operation

2024-05-05 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia resolved FLINK-34517.
--
Fix Version/s: 1.18.2
   1.20.0
   1.19.1
   Resolution: Fixed

> environment configs ignored when calling procedure operation
> 
>
> Key: FLINK-34517
> URL: https://issues.apache.org/jira/browse/FLINK-34517
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: JustinLee
>Assignee: JustinLee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.2, 1.20.0, 1.19.1
>
>
> when calling procedure operation in Flink SQL, the ProcedureContext only 
> contains the underlying application-specific config , not 
> environment-specific config.
> to be more specific, in a  Flink sql app of the same 
> StreamExecutionEnvironment which has a config1. when executing a sql query, 
> config1 works, while calling a sql procedure, config1 doesn't work, which 
> apparently is not an expected behavior.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34517) environment configs ignored when calling procedure operation

2024-05-05 Thread luoyuxia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823069#comment-17823069
 ] 

luoyuxia edited comment on FLINK-34517 at 5/6/24 1:27 AM:
--

1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e

1.19: fa426f104baa1343a07695dcf4c4984814f0fde4

master: 24b6f7cbf94fca154fc8680e8b3393abd68b8e77


was (Author: luoyuxia):
1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e

1.19: todo

master: 24b6f7cbf94fca154fc8680e8b3393abd68b8e77

> environment configs ignored when calling procedure operation
> 
>
> Key: FLINK-34517
> URL: https://issues.apache.org/jira/browse/FLINK-34517
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: JustinLee
>Assignee: JustinLee
>Priority: Major
>  Labels: pull-request-available
>
> when calling procedure operation in Flink SQL, the ProcedureContext only 
> contains the underlying application-specific config , not 
> environment-specific config.
> to be more specific, in a  Flink sql app of the same 
> StreamExecutionEnvironment which has a config1. when executing a sql query, 
> config1 works, while calling a sql procedure, config1 doesn't work, which 
> apparently is not an expected behavior.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-05-05 Thread via GitHub


luoyuxia merged PR #24656:
URL: https://github.com/apache/flink/pull/24656


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]

2024-05-05 Thread via GitHub


luoyuxia commented on PR #24656:
URL: https://github.com/apache/flink/pull/24656#issuecomment-2095050182

   @JustinLeesin Thanks for the pr!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-32843) [JUnit5 Migration] The jobmaster package of flink-runtime module

2024-05-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-32843.

Fix Version/s: 1.20.0
   Resolution: Fixed

Resolved via master: beb0b167bdcf95f27be87a214a69a174fd49d256

> [JUnit5 Migration] The jobmaster package of flink-runtime module
> 
>
> Key: FLINK-32843
> URL: https://issues.apache.org/jira/browse/FLINK-32843
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Rui Fan
>Assignee: RocMarshal
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34379) table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError

2024-05-05 Thread dalongliu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dalongliu updated FLINK-34379:
--
Fix Version/s: 1.17.3
   1.18.2

> table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
> --
>
> Key: FLINK-34379
> URL: https://issues.apache.org/jira/browse/FLINK-34379
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.18.1
> Environment: 1.17.1
>Reporter: zhu
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.17.3, 1.18.2, 1.20.0, 1.19.1
>
>
> When using batch computing, I union all about 50 tables and then join other 
> table. When compiling the execution plan, 
> there throws OutOfMemoryError: Java heap space, which was no problem in  
> 1.15.2. However, both 1.17.2 and 1.18.1 all throws same errors,This causes 
> jobmanager to restart. Currently,it has been found that this is caused by 
> table.optimizer.dynamic-filtering.enabled, which defaults is true,When I set 
> table.optimizer.dynamic-filtering.enabled to false, it can be compiled and 
> executed normally
> code
> TableEnvironment.create(EnvironmentSettings.newInstance()
> .withConfiguration(configuration)
> .inBatchMode().build())
> sql=select att,filename,'table0' as mo_name from table0 UNION All select 
> att,filename,'table1' as mo_name from table1 UNION All select 
> att,filename,'table2' as mo_name from table2 UNION All select 
> att,filename,'table3' as mo_name from table3 UNION All select 
> att,filename,'table4' as mo_name from table4 UNION All select 
> att,filename,'table5' as mo_name from table5 UNION All select 
> att,filename,'table6' as mo_name from table6 UNION All select 
> att,filename,'table7' as mo_name from table7 UNION All select 
> att,filename,'table8' as mo_name from table8 UNION All select 
> att,filename,'table9' as mo_name from table9 UNION All select 
> att,filename,'table10' as mo_name from table10 UNION All select 
> att,filename,'table11' as mo_name from table11 UNION All select 
> att,filename,'table12' as mo_name from table12 UNION All select 
> att,filename,'table13' as mo_name from table13 UNION All select 
> att,filename,'table14' as mo_name from table14 UNION All select 
> att,filename,'table15' as mo_name from table15 UNION All select 
> att,filename,'table16' as mo_name from table16 UNION All select 
> att,filename,'table17' as mo_name from table17 UNION All select 
> att,filename,'table18' as mo_name from table18 UNION All select 
> att,filename,'table19' as mo_name from table19 UNION All select 
> att,filename,'table20' as mo_name from table20 UNION All select 
> att,filename,'table21' as mo_name from table21 UNION All select 
> att,filename,'table22' as mo_name from table22 UNION All select 
> att,filename,'table23' as mo_name from table23 UNION All select 
> att,filename,'table24' as mo_name from table24 UNION All select 
> att,filename,'table25' as mo_name from table25 UNION All select 
> att,filename,'table26' as mo_name from table26 UNION All select 
> att,filename,'table27' as mo_name from table27 UNION All select 
> att,filename,'table28' as mo_name from table28 UNION All select 
> att,filename,'table29' as mo_name from table29 UNION All select 
> att,filename,'table30' as mo_name from table30 UNION All select 
> att,filename,'table31' as mo_name from table31 UNION All select 
> att,filename,'table32' as mo_name from table32 UNION All select 
> att,filename,'table33' as mo_name from table33 UNION All select 
> att,filename,'table34' as mo_name from table34 UNION All select 
> att,filename,'table35' as mo_name from table35 UNION All select 
> att,filename,'table36' as mo_name from table36 UNION All select 
> att,filename,'table37' as mo_name from table37 UNION All select 
> att,filename,'table38' as mo_name from table38 UNION All select 
> att,filename,'table39' as mo_name from table39 UNION All select 
> att,filename,'table40' as mo_name from table40 UNION All select 
> att,filename,'table41' as mo_name from table41 UNION All select 
> att,filename,'table42' as mo_name from table42 UNION All select 
> att,filename,'table43' as mo_name from table43 UNION All select 
> att,filename,'table44' as mo_name from table44 UNION All select 
> att,filename,'table45' as mo_name from table45 UNION All select 
> att,filename,'table46' as mo_name from table46 UNION All select 
> att,filename,'table47' as mo_name from table47 UNION All select 
> att,filename,'table48' as mo_name from table48 UNION All select 
> att,filename,'table49' as mo_name from table49 UNION All select 
> att,filename,'table50' as mo_name from table50 UNION All select 
> att,filename,'table51' as mo_name from table51 UNION All select 

[jira] [Comment Edited] (FLINK-34379) table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError

2024-05-05 Thread dalongliu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842220#comment-17842220
 ] 

dalongliu edited comment on FLINK-34379 at 5/6/24 1:16 AM:
---

Release-1.19: f321970111cfb6f340bd2eb0795cf24b81d583a6
Release-1.18: 9d0858ee745bc835efa78a34d849d5f3ecb89f6d
Realase-1.7: d2f93a5527b05583fc97bbae511ca0ac95325c02


was (Author: lsy):
Release-1.19: f321970111cfb6f340bd2eb0795cf24b81d583a6


> table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
> --
>
> Key: FLINK-34379
> URL: https://issues.apache.org/jira/browse/FLINK-34379
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.2, 1.18.1
> Environment: 1.17.1
>Reporter: zhu
>Assignee: Jeyhun Karimov
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.20.0, 1.19.1
>
>
> When using batch computing, I union all about 50 tables and then join other 
> table. When compiling the execution plan, 
> there throws OutOfMemoryError: Java heap space, which was no problem in  
> 1.15.2. However, both 1.17.2 and 1.18.1 all throws same errors,This causes 
> jobmanager to restart. Currently,it has been found that this is caused by 
> table.optimizer.dynamic-filtering.enabled, which defaults is true,When I set 
> table.optimizer.dynamic-filtering.enabled to false, it can be compiled and 
> executed normally
> code
> TableEnvironment.create(EnvironmentSettings.newInstance()
> .withConfiguration(configuration)
> .inBatchMode().build())
> sql=select att,filename,'table0' as mo_name from table0 UNION All select 
> att,filename,'table1' as mo_name from table1 UNION All select 
> att,filename,'table2' as mo_name from table2 UNION All select 
> att,filename,'table3' as mo_name from table3 UNION All select 
> att,filename,'table4' as mo_name from table4 UNION All select 
> att,filename,'table5' as mo_name from table5 UNION All select 
> att,filename,'table6' as mo_name from table6 UNION All select 
> att,filename,'table7' as mo_name from table7 UNION All select 
> att,filename,'table8' as mo_name from table8 UNION All select 
> att,filename,'table9' as mo_name from table9 UNION All select 
> att,filename,'table10' as mo_name from table10 UNION All select 
> att,filename,'table11' as mo_name from table11 UNION All select 
> att,filename,'table12' as mo_name from table12 UNION All select 
> att,filename,'table13' as mo_name from table13 UNION All select 
> att,filename,'table14' as mo_name from table14 UNION All select 
> att,filename,'table15' as mo_name from table15 UNION All select 
> att,filename,'table16' as mo_name from table16 UNION All select 
> att,filename,'table17' as mo_name from table17 UNION All select 
> att,filename,'table18' as mo_name from table18 UNION All select 
> att,filename,'table19' as mo_name from table19 UNION All select 
> att,filename,'table20' as mo_name from table20 UNION All select 
> att,filename,'table21' as mo_name from table21 UNION All select 
> att,filename,'table22' as mo_name from table22 UNION All select 
> att,filename,'table23' as mo_name from table23 UNION All select 
> att,filename,'table24' as mo_name from table24 UNION All select 
> att,filename,'table25' as mo_name from table25 UNION All select 
> att,filename,'table26' as mo_name from table26 UNION All select 
> att,filename,'table27' as mo_name from table27 UNION All select 
> att,filename,'table28' as mo_name from table28 UNION All select 
> att,filename,'table29' as mo_name from table29 UNION All select 
> att,filename,'table30' as mo_name from table30 UNION All select 
> att,filename,'table31' as mo_name from table31 UNION All select 
> att,filename,'table32' as mo_name from table32 UNION All select 
> att,filename,'table33' as mo_name from table33 UNION All select 
> att,filename,'table34' as mo_name from table34 UNION All select 
> att,filename,'table35' as mo_name from table35 UNION All select 
> att,filename,'table36' as mo_name from table36 UNION All select 
> att,filename,'table37' as mo_name from table37 UNION All select 
> att,filename,'table38' as mo_name from table38 UNION All select 
> att,filename,'table39' as mo_name from table39 UNION All select 
> att,filename,'table40' as mo_name from table40 UNION All select 
> att,filename,'table41' as mo_name from table41 UNION All select 
> att,filename,'table42' as mo_name from table42 UNION All select 
> att,filename,'table43' as mo_name from table43 UNION All select 
> att,filename,'table44' as mo_name from table44 UNION All select 
> att,filename,'table45' as mo_name from table45 UNION All select 
> att,filename,'table46' as mo_name from table46 UNION All select 
> att,filename,'table47' as mo_name from table47 UNION All select 
> 

Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-05 Thread via GitHub


lsyldliu merged PR #24744:
URL: https://github.com/apache/flink/pull/24744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32843][JUnit5 Migration] Migrate the jobmaster package of flink-runtime module to JUnit5 [flink]

2024-05-05 Thread via GitHub


Jiabao-Sun merged PR #24723:
URL: https://github.com/apache/flink/pull/24723


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-05 Thread via GitHub


lsyldliu merged PR #24743:
URL: https://github.com/apache/flink/pull/24743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-05 Thread via GitHub


jeyhunkarimov commented on PR #24744:
URL: https://github.com/apache/flink/pull/24744#issuecomment-2094931570

   Hi @lsyldliu the failure was related to sth else IMO, I could not reproduce 
locally. In fact, retriggering the CI seems to green. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.17][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-05 Thread via GitHub


jeyhunkarimov commented on PR #24743:
URL: https://github.com/apache/flink/pull/24743#issuecomment-2094931086

   Hi @lsyldliu should be fixed now. Could you please check? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]

2024-05-05 Thread via GitHub


jeyhunkarimov commented on PR #24744:
URL: https://github.com/apache/flink/pull/24744#issuecomment-2094879998

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme

2024-05-05 Thread LiuZeshan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LiuZeshan updated FLINK-35291:
--
Description: 
We are doing performance testing on Flink cdc 3.0 and found through the arthas 
profile that there is a significant performance bottleneck in the 
deserialization of row data. The main problem lies in the String. format in the 
BinaryRecordDataGenerator class, so we have made simple performance 
optimizations.

test environment:
 * flink: 1.20-SNAPSHOT master
 * flink-cdc: 3.2-SNAPSHOT master
 * 1CU minicluster mode

{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3308
  username: root
  password: 123456
  tables: test.user_behavior
  server-id: 5400-5404
  #server-time-zone: UTC
  scan.startup.mode: earliest-offset
  debezium.poll.interval.ms: 10

sink:
  type: values
  name: Values Sink
  materialized.in.memory: false
  print.enabled: false

pipeline:
  name: Sync MySQL Database to Values
  parallelism: 1{code}
 

*before optimization: 3.5w/s* 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179!

[^cdc-3.0-1c.html]

^Analyzing the flame chart, it can be found that approximately 24.45% of the 
time is spent on string.format.^

!image-2024-05-06-00-29-34-618.png|width=583,height=171!

 

*after optimization: 5w/s* 

!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174!
 
 [^cdc-3.0-1c-2.html]

After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the 
time is still spent on 
org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. 
Perhaps we can further optimize it.

!image-2024-05-06-00-37-16-028.png|width=379,height=107!

 

  was:
We are doing performance testing on Flink cdc 3.0 and found through the arthas 
profile that there is a significant performance bottleneck in the serialization 
of row data. The main problem lies in the String. format in the 
BinaryRecordDataGenerator class, so we have made simple performance 
optimizations.

test environment:
 * flink: 1.20-SNAPSHOT master
 * flink-cdc: 3.2-SNAPSHOT master
 * 1CU minicluster mode

{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3308
  username: root
  password: 123456
  tables: test.user_behavior
  server-id: 5400-5404
  #server-time-zone: UTC
  scan.startup.mode: earliest-offset
  debezium.poll.interval.ms: 10

sink:
  type: values
  name: Values Sink
  materialized.in.memory: false
  print.enabled: false

pipeline:
  name: Sync MySQL Database to Values
  parallelism: 1{code}
 

*before optimization: 3.5w/s* 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179!

[^cdc-3.0-1c.html]

^Analyzing the flame chart, it can be found that approximately 24.45% of the 
time is spent on string.format.^

!image-2024-05-06-00-29-34-618.png|width=583,height=171!

 

*after optimization: 5w/s* 

!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174!
 
 [^cdc-3.0-1c-2.html]

After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the 
time is still spent on 
org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. 
Perhaps we can further optimize it.

!image-2024-05-06-00-37-16-028.png|width=379,height=107!

 


> Improve the ROW data deserialization performance of 
> DebeziumEventDeserializationScheme
> --
>
> Key: FLINK-35291
> URL: https://issues.apache.org/jira/browse/FLINK-35291
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 1.20.0
>Reporter: LiuZeshan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, 
> image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png
>
>
> We are doing performance testing on Flink cdc 3.0 and found through the 
> arthas profile that there is a significant performance bottleneck in the 
> deserialization of row data. The main problem lies in the String. format 

[jira] [Updated] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme

2024-05-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35291:
---
Labels: pull-request-available  (was: )

> Improve the ROW data deserialization performance of 
> DebeziumEventDeserializationScheme
> --
>
> Key: FLINK-35291
> URL: https://issues.apache.org/jira/browse/FLINK-35291
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: 1.20.0
>Reporter: LiuZeshan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, 
> image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png
>
>
> We are doing performance testing on Flink cdc 3.0 and found through the 
> arthas profile that there is a significant performance bottleneck in the 
> serialization of row data. The main problem lies in the String. format in the 
> BinaryRecordDataGenerator class, so we have made simple performance 
> optimizations.
> test environment:
>  * flink: 1.20-SNAPSHOT master
>  * flink-cdc: 3.2-SNAPSHOT master
>  * 1CU minicluster mode
> {code:java}
> source:
>   type: mysql
>   hostname: localhost
>   port: 3308
>   username: root
>   password: 123456
>   tables: test.user_behavior
>   server-id: 5400-5404
>   #server-time-zone: UTC
>   scan.startup.mode: earliest-offset
>   debezium.poll.interval.ms: 10
> sink:
>   type: values
>   name: Values Sink
>   materialized.in.memory: false
>   print.enabled: false
> pipeline:
>   name: Sync MySQL Database to Values
>   parallelism: 1{code}
>  
> *before optimization: 3.5w/s* 
> !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179!
> [^cdc-3.0-1c.html]
> ^Analyzing the flame chart, it can be found that approximately 24.45% of the 
> time is spent on string.format.^
> !image-2024-05-06-00-29-34-618.png|width=583,height=171!
>  
> *after optimization: 5w/s* 
> !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174!
>  
>  [^cdc-3.0-1c-2.html]
> After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of 
> the time is still spent on 
> org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. 
> Perhaps we can further optimize it.
> !image-2024-05-06-00-37-16-028.png|width=379,height=107!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme

2024-05-05 Thread LiuZeshan (Jira)
LiuZeshan created FLINK-35291:
-

 Summary: Improve the ROW data deserialization performance of 
DebeziumEventDeserializationScheme
 Key: FLINK-35291
 URL: https://issues.apache.org/jira/browse/FLINK-35291
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 1.20.0
Reporter: LiuZeshan
 Fix For: 1.20.0
 Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, 
image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png

We are doing performance testing on Flink cdc 3.0 and found through the arthas 
profile that there is a significant performance bottleneck in the serialization 
of row data. The main problem lies in the String. format in the 
BinaryRecordDataGenerator class, so we have made simple performance 
optimizations.

test environment:
 * flink: 1.20-SNAPSHOT master
 * flink-cdc: 3.2-SNAPSHOT master
 * 1CU minicluster mode

{code:java}
source:
  type: mysql
  hostname: localhost
  port: 3308
  username: root
  password: 123456
  tables: test.user_behavior
  server-id: 5400-5404
  #server-time-zone: UTC
  scan.startup.mode: earliest-offset
  debezium.poll.interval.ms: 10

sink:
  type: values
  name: Values Sink
  materialized.in.memory: false
  print.enabled: false

pipeline:
  name: Sync MySQL Database to Values
  parallelism: 1{code}
 

*before optimization: 3.5w/s* 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179!

[^cdc-3.0-1c.html]

^Analyzing the flame chart, it can be found that approximately 24.45% of the 
time is spent on string.format.^

!image-2024-05-06-00-29-34-618.png|width=583,height=171!

 

*after optimization: 5w/s* 

!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174!
 
 [^cdc-3.0-1c-2.html]

After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the 
time is still spent on 
org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. 
Perhaps we can further optimize it.

!image-2024-05-06-00-37-16-028.png|width=379,height=107!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35270]Enrich information in logs, making it easier for debugging [flink]

2024-05-05 Thread via GitHub


HCTommy commented on PR #24747:
URL: https://github.com/apache/flink/pull/24747#issuecomment-2094854809

   Hi, @fredia . I optimized some logs, which I think is helpful for people to 
debug in production environment. Could you please review in your available time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35180) Instant in row doesn't convert to correct type in python thread mode

2024-05-05 Thread Wouter Zorgdrager (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843573#comment-17843573
 ] 

Wouter Zorgdrager commented on FLINK-35180:
---

I have encountered the same bug. The only addition I can make is that in 
`thread` mode, the object is a `pemja.PyJObject` which wraps 
`java.time.Instant`. That should be a `pyflink.common.time.Instant`. 

> Instant in row doesn't convert to correct type in python thread mode
> 
>
> Key: FLINK-35180
> URL: https://issues.apache.org/jira/browse/FLINK-35180
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Reporter: Wei Yuan
>Priority: Major
>
> {code:java}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> # init task env
> config = Configuration()
> # config.set_string("python.execution-mode", "thread")
> config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> def test_func(row):
>     print(row)
>     return row
> result_ds.map(test_func)
> env.execute(){code}
> output in process mode:
> {code:java}
> Row(id=1, content='Hi', dt=Instant<1713609386, 27100>)
> Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) {code}
> output in thread mode:
> {code:java}
>  
> Row(id=1, content='Hi', dt=)
> Traceback (most recent call last):
>   File "/home/disk1/yuanwei/bug.py", line 31, in 
>     env.execute()
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 773, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
> line 1322, in {}call{}
>     return_value = get_return_value(
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
> line 146, in deco
>     return f(*a, **kw)
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
> 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>         at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>         at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
>         at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at 
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
>         at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>         at 

[jira] [Closed] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode

2024-05-05 Thread Wouter Zorgdrager (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wouter Zorgdrager closed FLINK-35290.
-
Resolution: Duplicate

> Wrong Instant type conversion TableAPI to Datastream in thread mode
> ---
>
> Key: FLINK-35290
> URL: https://issues.apache.org/jira/browse/FLINK-35290
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Priority: Major
>
> In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a 
> Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm 
> wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` 
> maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? 
> Nevertheless, if we switch from `process` to `thread` mode for execution, the 
> `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a 
> `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if 
> I only use the DatastreamAPI  and read `Types.Instant()` directly, the 
> conversion in both `thread` and `process` mode seem to work just fine.
> Below a minimal example exposing the bug:
> ```
> EXECUTION_MODE = "thread"  # or "process"
> config = Configuration()
> config.set_string("python.execution-mode", EXECUTION_MODE)
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set("parallelism.default", "1")
> t_env.get_config().set("python.fn-execution.bundle.size", "1")
> t_env.get_config().set("python.execution-mode", EXECUTION_MODE)
> def to_epoch_ms(row: Row):
> print(type(row[1]))
> return row[1].to_epoch_milli()
> t_env.to_data_stream(
> t_env.from_elements(
> [
> (1, datetime(year=2024, day=10, month=9, hour=9)),
> (2, datetime(year=2024, day=10, month=9, hour=12)),
> (3, datetime(year=2024, day=22, month=11, hour=12)),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
> ]
> ),
> )
> ).map(to_epoch_ms, output_type=Types.LONG()).print()
> env.execute()
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode

2024-05-05 Thread Wouter Zorgdrager (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843572#comment-17843572
 ] 

Wouter Zorgdrager commented on FLINK-35290:
---

It seems this bug has already been reported here 
https://issues.apache.org/jira/browse/FLINK-35180. Will close this issue. 

> Wrong Instant type conversion TableAPI to Datastream in thread mode
> ---
>
> Key: FLINK-35290
> URL: https://issues.apache.org/jira/browse/FLINK-35290
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Priority: Major
>
> In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a 
> Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm 
> wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` 
> maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? 
> Nevertheless, if we switch from `process` to `thread` mode for execution, the 
> `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a 
> `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if 
> I only use the DatastreamAPI  and read `Types.Instant()` directly, the 
> conversion in both `thread` and `process` mode seem to work just fine.
> Below a minimal example exposing the bug:
> ```
> EXECUTION_MODE = "thread"  # or "process"
> config = Configuration()
> config.set_string("python.execution-mode", EXECUTION_MODE)
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().set("parallelism.default", "1")
> t_env.get_config().set("python.fn-execution.bundle.size", "1")
> t_env.get_config().set("python.execution-mode", EXECUTION_MODE)
> def to_epoch_ms(row: Row):
> print(type(row[1]))
> return row[1].to_epoch_milli()
> t_env.to_data_stream(
> t_env.from_elements(
> [
> (1, datetime(year=2024, day=10, month=9, hour=9)),
> (2, datetime(year=2024, day=10, month=9, hour=12)),
> (3, datetime(year=2024, day=22, month=11, hour=12)),
> ],
> DataTypes.ROW(
> [
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
> ]
> ),
> )
> ).map(to_epoch_ms, output_type=Types.LONG()).print()
> env.execute()
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode

2024-05-05 Thread Wouter Zorgdrager (Jira)
Wouter Zorgdrager created FLINK-35290:
-

 Summary: Wrong Instant type conversion TableAPI to Datastream in 
thread mode
 Key: FLINK-35290
 URL: https://issues.apache.org/jira/browse/FLINK-35290
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.1
Reporter: Wouter Zorgdrager


In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a 
Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm 
wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps 
to a Python `datetime`. Can't the same be done for the DatastreamAPI? 
Nevertheless, if we switch from `process` to `thread` mode for execution, the 
`TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a 
`java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I 
only use the DatastreamAPI  and read `Types.Instant()` directly, the conversion 
in both `thread` and `process` mode seem to work just fine.

Below a minimal example exposing the bug:

```
EXECUTION_MODE = "thread"  # or "process"
config = Configuration()
config.set_string("python.execution-mode", EXECUTION_MODE)

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("python.fn-execution.bundle.size", "1")
t_env.get_config().set("python.execution-mode", EXECUTION_MODE)


def to_epoch_ms(row: Row):
print(type(row[1]))
return row[1].to_epoch_milli()


t_env.to_data_stream(
t_env.from_elements(
[
(1, datetime(year=2024, day=10, month=9, hour=9)),
(2, datetime(year=2024, day=10, month=9, hour=12)),
(3, datetime(year=2024, day=22, month=11, hour=12)),
],
DataTypes.ROW(
[
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)),
]
),
)
).map(to_epoch_ms, output_type=Types.LONG()).print()
env.execute()
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]

2024-05-05 Thread via GitHub


flinkbot commented on PR #24756:
URL: https://github.com/apache/flink/pull/24756#issuecomment-2094773254

   
   ## CI report:
   
   * ebca67d26c29e755aef515140ad2aacc6a6e6835 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35112) Membership for Row class does not include field names

2024-05-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35112:
---
Labels: pull-request-available  (was: )

> Membership for Row class does not include field names
> -
>
> Key: FLINK-35112
> URL: https://issues.apache.org/jira/browse/FLINK-35112
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.18.1
>Reporter: Wouter Zorgdrager
>Priority: Minor
>  Labels: pull-request-available
>
> In the Row class in PyFlink I cannot do a membership check for field names. 
> This minimal example will show the unexpected behavior:
> ```
> from pyflink.common import Row
> row = Row(name="Alice", age=11)
> # Expected to be True, but is False
> print("name" in row)
> person = Row("name", "age")
> # This is True, as expected
> print('name' in person)
> ```
> The related code in the Row class is:
> ```
>     def __contains__(self, item):
>         return item in self._values
> ```
> It should be relatively easy to fix with the following code:
> ```
>     def __contains__(self, item):
>         if hasattr(self, "_fields"):
>             return item in self._fields
>         else:
>             return item in self._values
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35112][Python] Fix membership for Row class PyFlink [flink]

2024-05-05 Thread via GitHub


wzorgdrager opened a new pull request, #24756:
URL: https://github.com/apache/flink/pull/24756

   
   
   ## What is the purpose of the change
   This pull request adds support for a membership check for field names in a 
Row in PyFlink. If field names are not defined, it will check membership in the 
values. For example:
   ```
   user_row = Row(name="Alice", age=22)
   print("name" in user_row)
   # Evaluates to True
   
   user_row = Row("Alice", 22)
   print("Alice" in user_row)
   # Evaluates to True
   ```
   
   This change is more in line with how dictionaries behave in Python. 
   
   
   ## Brief change log
   - Change `in` behaviour for `Row` class.
  - If field names are defined, `KEY in row` will check if  `KEY` exists in 
field names.
  - If field names do not exist, `KEY in row` will check if `KEY` exists in 
field values (current behaviour).
   
   
   ## Verifying this change
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]

2024-05-05 Thread via GitHub


Samrat002 commented on PR #114:
URL: 
https://github.com/apache/flink-connector-aws/pull/114#issuecomment-2094737962

   > I have left some comments, I will continue the review later. I believe 
this PR is incomplete right? we still need to add tests.
   
   yes , tests were not added , since it will increase the size of PR. 
   As discussed offline , i will reduce the scope of this pr to only Async v2 
and move other things to succeeding pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]

2024-05-05 Thread via GitHub


Samrat002 commented on code in PR #114:
URL: 
https://github.com/apache/flink-connector-aws/pull/114#discussion_r1590270090


##
flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.redshift.executor.RedshiftS3Util;
+import org.apache.flink.connector.redshift.mode.SinkMode;
+import org.apache.flink.connector.redshift.options.RedshiftOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Dynamic Table Factory. */
+@PublicEvolving
+public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory {
+public static final String IDENTIFIER = "redshift";
+
+public static final ConfigOption HOSTNAME =
+ConfigOptions.key("hostname")
+.stringType()
+.noDefaultValue()
+.withDeprecatedKeys("AWS Redshift cluster hostname.");
+
+public static final ConfigOption PORT =
+ConfigOptions.key("port")
+.intType()
+.defaultValue(5439)
+.withDeprecatedKeys("AWS Redshift port number.\nDefault 
value : 5439.");
+
+public static final ConfigOption USERNAME =
+ConfigOptions.key("username")
+.stringType()
+.noDefaultValue()
+.withDescription("AWS Redshift Cluster username.");
+
+public static final ConfigOption PASSWORD =
+ConfigOptions.key("password")
+.stringType()
+.noDefaultValue()
+.withDescription("AWS Redshift cluster password.");
+
+public static final ConfigOption DATABASE_NAME =
+ConfigOptions.key("sink.database-name")
+.stringType()
+.defaultValue("dev")
+.withDescription(
+"AWS Redshift cluster database name. Default value 
set to `dev`.");
+
+public static final ConfigOption TABLE_NAME =
+ConfigOptions.key("sink.table-name")
+.stringType()
+.noDefaultValue()
+.withDescription("AWS Redshift cluster sink table name.");
+
+public static final ConfigOption SINK_BATCH_SIZE =
+ConfigOptions.key("sink.batch-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"`sink.batch-size` determines the maximum size of 
batch, in terms of the number of records, "
++ "at which data will trigger a flush 
operation."
++ " When the number of records exceeds 
this threshold, the system initiates a flush to manage the data.\n"
++ "Default Value: 1000");
+
+public static final ConfigOption SINK_FLUSH_INTERVAL =
+ConfigOptions.key("sink.flush-interval")
+.durationType()
+.defaultValue(Duration.ofSeconds(1L))
+.withDescription(
+"the flush interval mills, over this time, 
asynchronous threads will flush data. The default value is 1s.");
+
+public static final ConfigOption SINK_MAX_RETRIES =
+ConfigOptions.key("sink.max-retries")
+.intType()
+.defaultValue(3)
+  

Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]

2024-05-05 Thread via GitHub


Samrat002 commented on code in PR #114:
URL: 
https://github.com/apache/flink-connector-aws/pull/114#discussion_r1590269554


##
flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.redshift.executor.RedshiftS3Util;
+import org.apache.flink.connector.redshift.mode.SinkMode;
+import org.apache.flink.connector.redshift.options.RedshiftOptions;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Dynamic Table Factory. */
+@PublicEvolving
+public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory {
+public static final String IDENTIFIER = "redshift";
+
+public static final ConfigOption HOSTNAME =
+ConfigOptions.key("hostname")
+.stringType()
+.noDefaultValue()
+.withDeprecatedKeys("AWS Redshift cluster hostname.");
+
+public static final ConfigOption PORT =
+ConfigOptions.key("port")
+.intType()
+.defaultValue(5439)
+.withDeprecatedKeys("AWS Redshift port number.\nDefault 
value : 5439.");
+
+public static final ConfigOption USERNAME =
+ConfigOptions.key("username")
+.stringType()
+.noDefaultValue()
+.withDescription("AWS Redshift Cluster username.");
+
+public static final ConfigOption PASSWORD =
+ConfigOptions.key("password")
+.stringType()
+.noDefaultValue()
+.withDescription("AWS Redshift cluster password.");
+
+public static final ConfigOption DATABASE_NAME =
+ConfigOptions.key("sink.database-name")
+.stringType()
+.defaultValue("dev")

Review Comment:
   `dev` is the default database name created by redshift. i assumed if user 
dont provide database name as a config then it should assume database name as 
default one 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]

2024-05-05 Thread via GitHub


GOODBOY008 commented on PR #24730:
URL: https://github.com/apache/flink/pull/24730#issuecomment-2094663611

   @afedulov Would you help review this pr? Thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org