[jira] [Closed] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
[ https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun closed FLINK-35274. -- > Occasional failure issue with Flink CDC Db2 UT > -- > > Key: FLINK-35274 > URL: https://issues.apache.org/jira/browse/FLINK-35274 > Project: Flink > Issue Type: Bug >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Critical > Labels: pull-request-available > Fix For: 3.1.0 > > > Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data > tableId don't have database name, it will cause table schame occasional not > found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
[ https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-35274. Resolution: Fixed Fixed via cdc * master: a7cb46f7621568486a069a7ae01a7b86ebb0a801 * release-3.1: d556f29475a52234a98bcc65db959483a10beb52 > Occasional failure issue with Flink CDC Db2 UT > -- > > Key: FLINK-35274 > URL: https://issues.apache.org/jira/browse/FLINK-35274 > Project: Flink > Issue Type: Bug >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Critical > Labels: pull-request-available > Fix For: 3.1.0 > > > Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data > tableId don't have database name, it will cause table schame occasional not > found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]
yuxiqian commented on PR #2968: URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095245702 Hi @AidenPerce, the Db2 CI fix has been merged into `master` branch, could you please rebase this PR and see if this problem persists? Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35274) Occasional failure issue with Flink CDC Db2 UT
[ https://issues.apache.org/jira/browse/FLINK-35274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun reassigned FLINK-35274: -- Assignee: Xin Gong > Occasional failure issue with Flink CDC Db2 UT > -- > > Key: FLINK-35274 > URL: https://issues.apache.org/jira/browse/FLINK-35274 > Project: Flink > Issue Type: Bug >Reporter: Xin Gong >Assignee: Xin Gong >Priority: Critical > Labels: pull-request-available > Fix For: 3.1.0 > > > Occasional failure issue with Flink CDC Db2 UT. Because db2 redolog data > tableId don't have database name, it will cause table schame occasional not > found when task exception restart. I will fix it by supplement database name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]
Jiabao-Sun merged PR #3283: URL: https://github.com/apache/flink-cdc/pull/3283 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35274][cdc-connector][db2] Fix occasional failure issue with Flink CDC Db2 UT [flink-cdc]
Jiabao-Sun merged PR #3284: URL: https://github.com/apache/flink-cdc/pull/3284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc [flink-cdc]
xieyi888 commented on PR #3266: URL: https://github.com/apache/flink-cdc/pull/3266#issuecomment-2095236881 @yuxiqian @czy006 I had made changes, Please review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]
yuxiqian commented on PR #2968: URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095235515 Hi @AidenPerce, sorry about the inconvenience. I think it's related to a glitch in Db2 incremental connector and should be fixed by #3283. Will try to get it merged asap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]
fredia commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1590503035 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -104,15 +105,21 @@ public class AsyncExecutionController implements StateRequestHandler { public AsyncExecutionController( MailboxExecutor mailboxExecutor, -StateExecutor stateExecutor, +AsyncKeyedStateBackend asyncKeyedStateBackend, int maxParallelism, int batchSize, long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); -this.stateExecutor = stateExecutor; +if (asyncKeyedStateBackend != null) { Review Comment: Would it be better to put the initialization of the `state executor` in the `StreamOperator`? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java: ## @@ -242,6 +248,25 @@ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore( return keyedStateStore; } +// TODO: Reconstruct this after StateManager is ready in FLIP-410. Review Comment: How about renaming it to `getValueState()`? BTW, when the user provides the stateDescriptorV2, this method is used, and when the user provides the stateDescriptorV1, the old `getState` is used, right? Can stateDescriptorV2 and stateDescriptorV1 coexist? ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java: ## @@ -54,7 +59,14 @@ public class ForStKeyedStateBackendBuilder protected final Logger logger = LoggerFactory.getLogger(getClass()); +private static final int KEY_SERIALIZER_BUFFER_START_SIZE = 32; + +private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128; Review Comment: Why is 128 chosen here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xintong Song reassigned FLINK-35232: Assignee: Oleksandr Nitavskyi (was: Ravi Singh) > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > Basically the proposal is to be able to tune the timeout via multiplier, > maxAttemts + totalTimeout mechanisms. > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843615#comment-17843615 ] Xintong Song commented on FLINK-35232: -- Thanks for the information. I'm assigning this to [~Oleksandr Nitavskyi] who provided the PR. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > Labels: pull-request-available > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > Basically the proposal is to be able to tune the timeout via multiplier, > maxAttemts + totalTimeout mechanisms. > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843612#comment-17843612 ] Vikas M commented on FLINK-35232: - > BTW, the ticket is currently assigned to [~singhravidutt] , who is neither > the reporter of the ticket nor the author of the PR. Anyone knows the context? Regarding above, it occurred as I cloned https://issues.apache.org/jira/browse/FLINK-32877 to create this ticket and it did not let me change the assignee of this ticket once it got created. Please feel free to change the assignee as appropriate, thanks. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > Labels: pull-request-available > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > Basically the proposal is to be able to tune the timeout via multiplier, > maxAttemts + totalTimeout mechanisms. > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]
AidenPerce commented on PR #2968: URL: https://github.com/apache/flink-cdc/pull/2968#issuecomment-2095121064 > Hi @AidenPerce, could you please rebase this PR with latest `master` branch before it could be merged? Renaming like `com.ververica.cdc` to `org.apache.flink.cdc` might be necessary. I have rebase the branch of this pr, but there are some problems in test-case of db2-connector, that caused i can't update this pr. I don't know why the case couldn't pass, this is the check log: https://github.com/AidenPerce/flink-cdc-connectors/actions/runs/8963238039/job/24613239338 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35262] Bridge between AsyncKeyedStateBackend and AsyncExecutionController [flink]
masteryhx commented on code in PR #24740: URL: https://github.com/apache/flink/pull/24740#discussion_r1590495309 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -74,15 +76,20 @@ public void setup( final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); final long asyncBufferTimeout = environment.getExecutionConfig().getAsyncStateBufferTimeout(); -// TODO: initial state executor and set state executor for aec + +AsyncKeyedStateBackend asyncKeyedStateBackend = +Preconditions.checkNotNull( +stateHandler.getAsyncKeyedStateBackend(), +"Current State Backend doesn't support async access"); this.asyncExecutionController = new AsyncExecutionController( mailboxExecutor, -null, +asyncKeyedStateBackend.createStateExecutor(), maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit); +asyncKeyedStateBackend.setup(asyncExecutionController); Review Comment: Thanks for the suggestion. I think it's reasonable to maintain `setup` in AEC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843606#comment-17843606 ] Xintong Song commented on FLINK-35232: -- I think we can just move forward with this Jira ticket. I noticed there's already a PR opened. I can help review and merge it. And it would be helpful if [~galenwarren] can also take a look at it. Not only committers but everyone can help review PRs. Committers are only needed for the final merge. BTW, the ticket is currently assigned to [~singhravidutt] , who is neither the reporter of the ticket nor the author of the PR. Anyone knows the context? > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > Labels: pull-request-available > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > Basically the proposal is to be able to tune the timeout via multiplier, > maxAttemts + totalTimeout mechanisms. > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc [flink-cdc]
xieyi888 commented on code in PR #3266: URL: https://github.com/apache/flink-cdc/pull/3266#discussion_r1590486774 ## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.tidb.metrics; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.testutils.MetricListener; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Unit test for {@link TiDBSourceMetrics}. */ +public class TiDBSourceMetricsTest { +private static final String FETCH_EVENTTIME_LAG = "currentFetchEventTimeLag"; Review Comment: > Maybe can use MetricNames.CURRENT_FETCH_EVENT_TIME_LAG Replace it? Thanks @czy006 , I had use MetricNames.CURRENT_EMIT_EVENT_TIME_LAG, MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, MetricNames.SOURCE_IDLE_TIME instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] repair a snapshot-split bug: [flink-cdc]
AidenPerce closed pull request #2968: [fix] repair a snapshot-split bug: URL: https://github.com/apache/flink-cdc/pull/2968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [tidb] Add metrics for tidb connector [flink-cdc]
xieyi888 closed pull request #1974: [tidb] Add metrics for tidb connector URL: https://github.com/apache/flink-cdc/pull/1974 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test
[ https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun closed FLINK-35244. -- > Correct the package for flink-connector-tidb-cdc test > -- > > Key: FLINK-35244 > URL: https://issues.apache.org/jira/browse/FLINK-35244 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Xie Yi >Assignee: Xie Yi >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > Attachments: image-2024-04-26-16-19-39-297.png > > > test case for flink-connector-tidb-cdc should under > *org.apache.flink.cdc.connectors.tidb* package > instead of *org.apache.flink.cdc.connectors* > !image-2024-04-26-16-19-39-297.png! > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test
[ https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-35244. Fix Version/s: cdc-3.2.0 Resolution: Fixed Resolved via cdc-master: 002b16ed4e155b01374040ff302b7536d9c41245 > Correct the package for flink-connector-tidb-cdc test > -- > > Key: FLINK-35244 > URL: https://issues.apache.org/jira/browse/FLINK-35244 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Xie Yi >Assignee: Xie Yi >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > Attachments: image-2024-04-26-16-19-39-297.png > > > test case for flink-connector-tidb-cdc should under > *org.apache.flink.cdc.connectors.tidb* package > instead of *org.apache.flink.cdc.connectors* > !image-2024-04-26-16-19-39-297.png! > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35244) Correct the package for flink-connector-tidb-cdc test
[ https://issues.apache.org/jira/browse/FLINK-35244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-35244: --- Summary: Correct the package for flink-connector-tidb-cdc test (was: Move package for flink-connector-tidb-cdc test) > Correct the package for flink-connector-tidb-cdc test > -- > > Key: FLINK-35244 > URL: https://issues.apache.org/jira/browse/FLINK-35244 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Xie Yi >Assignee: Xie Yi >Priority: Major > Labels: pull-request-available > Attachments: image-2024-04-26-16-19-39-297.png > > > test case for flink-connector-tidb-cdc should under > *org.apache.flink.cdc.connectors.tidb* package > instead of *org.apache.flink.cdc.connectors* > !image-2024-04-26-16-19-39-297.png! > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35244][cdc-connector][tidb] Move package for flink-connector-tidb-cdc test [flink-cdc]
Jiabao-Sun merged PR #3265: URL: https://github.com/apache/flink-cdc/pull/3265 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-34517) environment configs ignored when calling procedure operation
[ https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] luoyuxia resolved FLINK-34517. -- Fix Version/s: 1.18.2 1.20.0 1.19.1 Resolution: Fixed > environment configs ignored when calling procedure operation > > > Key: FLINK-34517 > URL: https://issues.apache.org/jira/browse/FLINK-34517 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: JustinLee >Assignee: JustinLee >Priority: Major > Labels: pull-request-available > Fix For: 1.18.2, 1.20.0, 1.19.1 > > > when calling procedure operation in Flink SQL, the ProcedureContext only > contains the underlying application-specific config , not > environment-specific config. > to be more specific, in a Flink sql app of the same > StreamExecutionEnvironment which has a config1. when executing a sql query, > config1 works, while calling a sql procedure, config1 doesn't work, which > apparently is not an expected behavior. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34517) environment configs ignored when calling procedure operation
[ https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823069#comment-17823069 ] luoyuxia edited comment on FLINK-34517 at 5/6/24 1:27 AM: -- 1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e 1.19: fa426f104baa1343a07695dcf4c4984814f0fde4 master: 24b6f7cbf94fca154fc8680e8b3393abd68b8e77 was (Author: luoyuxia): 1.18: 620c5a7aeba448d247107ad44a6ba6f1e759052e 1.19: todo master: 24b6f7cbf94fca154fc8680e8b3393abd68b8e77 > environment configs ignored when calling procedure operation > > > Key: FLINK-34517 > URL: https://issues.apache.org/jira/browse/FLINK-34517 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: JustinLee >Assignee: JustinLee >Priority: Major > Labels: pull-request-available > > when calling procedure operation in Flink SQL, the ProcedureContext only > contains the underlying application-specific config , not > environment-specific config. > to be more specific, in a Flink sql app of the same > StreamExecutionEnvironment which has a config1. when executing a sql query, > config1 works, while calling a sql procedure, config1 doesn't work, which > apparently is not an expected behavior. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
luoyuxia merged PR #24656: URL: https://github.com/apache/flink/pull/24656 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.19][FLINK-34517][table]fix environment configs ignored when calling procedure operation [flink]
luoyuxia commented on PR #24656: URL: https://github.com/apache/flink/pull/24656#issuecomment-2095050182 @JustinLeesin Thanks for the pr! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-32843) [JUnit5 Migration] The jobmaster package of flink-runtime module
[ https://issues.apache.org/jira/browse/FLINK-32843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-32843. Fix Version/s: 1.20.0 Resolution: Fixed Resolved via master: beb0b167bdcf95f27be87a214a69a174fd49d256 > [JUnit5 Migration] The jobmaster package of flink-runtime module > > > Key: FLINK-32843 > URL: https://issues.apache.org/jira/browse/FLINK-32843 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Rui Fan >Assignee: RocMarshal >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34379) table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
[ https://issues.apache.org/jira/browse/FLINK-34379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dalongliu updated FLINK-34379: -- Fix Version/s: 1.17.3 1.18.2 > table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError > -- > > Key: FLINK-34379 > URL: https://issues.apache.org/jira/browse/FLINK-34379 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.2, 1.18.1 > Environment: 1.17.1 >Reporter: zhu >Assignee: Jeyhun Karimov >Priority: Critical > Labels: pull-request-available > Fix For: 1.17.3, 1.18.2, 1.20.0, 1.19.1 > > > When using batch computing, I union all about 50 tables and then join other > table. When compiling the execution plan, > there throws OutOfMemoryError: Java heap space, which was no problem in > 1.15.2. However, both 1.17.2 and 1.18.1 all throws same errors,This causes > jobmanager to restart. Currently,it has been found that this is caused by > table.optimizer.dynamic-filtering.enabled, which defaults is true,When I set > table.optimizer.dynamic-filtering.enabled to false, it can be compiled and > executed normally > code > TableEnvironment.create(EnvironmentSettings.newInstance() > .withConfiguration(configuration) > .inBatchMode().build()) > sql=select att,filename,'table0' as mo_name from table0 UNION All select > att,filename,'table1' as mo_name from table1 UNION All select > att,filename,'table2' as mo_name from table2 UNION All select > att,filename,'table3' as mo_name from table3 UNION All select > att,filename,'table4' as mo_name from table4 UNION All select > att,filename,'table5' as mo_name from table5 UNION All select > att,filename,'table6' as mo_name from table6 UNION All select > att,filename,'table7' as mo_name from table7 UNION All select > att,filename,'table8' as mo_name from table8 UNION All select > att,filename,'table9' as mo_name from table9 UNION All select > att,filename,'table10' as mo_name from table10 UNION All select > att,filename,'table11' as mo_name from table11 UNION All select > att,filename,'table12' as mo_name from table12 UNION All select > att,filename,'table13' as mo_name from table13 UNION All select > att,filename,'table14' as mo_name from table14 UNION All select > att,filename,'table15' as mo_name from table15 UNION All select > att,filename,'table16' as mo_name from table16 UNION All select > att,filename,'table17' as mo_name from table17 UNION All select > att,filename,'table18' as mo_name from table18 UNION All select > att,filename,'table19' as mo_name from table19 UNION All select > att,filename,'table20' as mo_name from table20 UNION All select > att,filename,'table21' as mo_name from table21 UNION All select > att,filename,'table22' as mo_name from table22 UNION All select > att,filename,'table23' as mo_name from table23 UNION All select > att,filename,'table24' as mo_name from table24 UNION All select > att,filename,'table25' as mo_name from table25 UNION All select > att,filename,'table26' as mo_name from table26 UNION All select > att,filename,'table27' as mo_name from table27 UNION All select > att,filename,'table28' as mo_name from table28 UNION All select > att,filename,'table29' as mo_name from table29 UNION All select > att,filename,'table30' as mo_name from table30 UNION All select > att,filename,'table31' as mo_name from table31 UNION All select > att,filename,'table32' as mo_name from table32 UNION All select > att,filename,'table33' as mo_name from table33 UNION All select > att,filename,'table34' as mo_name from table34 UNION All select > att,filename,'table35' as mo_name from table35 UNION All select > att,filename,'table36' as mo_name from table36 UNION All select > att,filename,'table37' as mo_name from table37 UNION All select > att,filename,'table38' as mo_name from table38 UNION All select > att,filename,'table39' as mo_name from table39 UNION All select > att,filename,'table40' as mo_name from table40 UNION All select > att,filename,'table41' as mo_name from table41 UNION All select > att,filename,'table42' as mo_name from table42 UNION All select > att,filename,'table43' as mo_name from table43 UNION All select > att,filename,'table44' as mo_name from table44 UNION All select > att,filename,'table45' as mo_name from table45 UNION All select > att,filename,'table46' as mo_name from table46 UNION All select > att,filename,'table47' as mo_name from table47 UNION All select > att,filename,'table48' as mo_name from table48 UNION All select > att,filename,'table49' as mo_name from table49 UNION All select > att,filename,'table50' as mo_name from table50 UNION All select > att,filename,'table51' as mo_name from table51 UNION All select
[jira] [Comment Edited] (FLINK-34379) table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError
[ https://issues.apache.org/jira/browse/FLINK-34379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842220#comment-17842220 ] dalongliu edited comment on FLINK-34379 at 5/6/24 1:16 AM: --- Release-1.19: f321970111cfb6f340bd2eb0795cf24b81d583a6 Release-1.18: 9d0858ee745bc835efa78a34d849d5f3ecb89f6d Realase-1.7: d2f93a5527b05583fc97bbae511ca0ac95325c02 was (Author: lsy): Release-1.19: f321970111cfb6f340bd2eb0795cf24b81d583a6 > table.optimizer.dynamic-filtering.enabled lead to OutOfMemoryError > -- > > Key: FLINK-34379 > URL: https://issues.apache.org/jira/browse/FLINK-34379 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.17.2, 1.18.1 > Environment: 1.17.1 >Reporter: zhu >Assignee: Jeyhun Karimov >Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0, 1.19.1 > > > When using batch computing, I union all about 50 tables and then join other > table. When compiling the execution plan, > there throws OutOfMemoryError: Java heap space, which was no problem in > 1.15.2. However, both 1.17.2 and 1.18.1 all throws same errors,This causes > jobmanager to restart. Currently,it has been found that this is caused by > table.optimizer.dynamic-filtering.enabled, which defaults is true,When I set > table.optimizer.dynamic-filtering.enabled to false, it can be compiled and > executed normally > code > TableEnvironment.create(EnvironmentSettings.newInstance() > .withConfiguration(configuration) > .inBatchMode().build()) > sql=select att,filename,'table0' as mo_name from table0 UNION All select > att,filename,'table1' as mo_name from table1 UNION All select > att,filename,'table2' as mo_name from table2 UNION All select > att,filename,'table3' as mo_name from table3 UNION All select > att,filename,'table4' as mo_name from table4 UNION All select > att,filename,'table5' as mo_name from table5 UNION All select > att,filename,'table6' as mo_name from table6 UNION All select > att,filename,'table7' as mo_name from table7 UNION All select > att,filename,'table8' as mo_name from table8 UNION All select > att,filename,'table9' as mo_name from table9 UNION All select > att,filename,'table10' as mo_name from table10 UNION All select > att,filename,'table11' as mo_name from table11 UNION All select > att,filename,'table12' as mo_name from table12 UNION All select > att,filename,'table13' as mo_name from table13 UNION All select > att,filename,'table14' as mo_name from table14 UNION All select > att,filename,'table15' as mo_name from table15 UNION All select > att,filename,'table16' as mo_name from table16 UNION All select > att,filename,'table17' as mo_name from table17 UNION All select > att,filename,'table18' as mo_name from table18 UNION All select > att,filename,'table19' as mo_name from table19 UNION All select > att,filename,'table20' as mo_name from table20 UNION All select > att,filename,'table21' as mo_name from table21 UNION All select > att,filename,'table22' as mo_name from table22 UNION All select > att,filename,'table23' as mo_name from table23 UNION All select > att,filename,'table24' as mo_name from table24 UNION All select > att,filename,'table25' as mo_name from table25 UNION All select > att,filename,'table26' as mo_name from table26 UNION All select > att,filename,'table27' as mo_name from table27 UNION All select > att,filename,'table28' as mo_name from table28 UNION All select > att,filename,'table29' as mo_name from table29 UNION All select > att,filename,'table30' as mo_name from table30 UNION All select > att,filename,'table31' as mo_name from table31 UNION All select > att,filename,'table32' as mo_name from table32 UNION All select > att,filename,'table33' as mo_name from table33 UNION All select > att,filename,'table34' as mo_name from table34 UNION All select > att,filename,'table35' as mo_name from table35 UNION All select > att,filename,'table36' as mo_name from table36 UNION All select > att,filename,'table37' as mo_name from table37 UNION All select > att,filename,'table38' as mo_name from table38 UNION All select > att,filename,'table39' as mo_name from table39 UNION All select > att,filename,'table40' as mo_name from table40 UNION All select > att,filename,'table41' as mo_name from table41 UNION All select > att,filename,'table42' as mo_name from table42 UNION All select > att,filename,'table43' as mo_name from table43 UNION All select > att,filename,'table44' as mo_name from table44 UNION All select > att,filename,'table45' as mo_name from table45 UNION All select > att,filename,'table46' as mo_name from table46 UNION All select > att,filename,'table47' as mo_name from table47 UNION All select >
Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
lsyldliu merged PR #24744: URL: https://github.com/apache/flink/pull/24744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32843][JUnit5 Migration] Migrate the jobmaster package of flink-runtime module to JUnit5 [flink]
Jiabao-Sun merged PR #24723: URL: https://github.com/apache/flink/pull/24723 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
lsyldliu merged PR #24743: URL: https://github.com/apache/flink/pull/24743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
jeyhunkarimov commented on PR #24744: URL: https://github.com/apache/flink/pull/24744#issuecomment-2094931570 Hi @lsyldliu the failure was related to sth else IMO, I could not reproduce locally. In fact, retriggering the CI seems to green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.17][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
jeyhunkarimov commented on PR #24743: URL: https://github.com/apache/flink/pull/24743#issuecomment-2094931086 Hi @lsyldliu should be fixed now. Could you please check? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.18][FLINK-34379][table] Fix OutOfMemoryError with large queries [flink]
jeyhunkarimov commented on PR #24744: URL: https://github.com/apache/flink/pull/24744#issuecomment-2094879998 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme
[ https://issues.apache.org/jira/browse/FLINK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LiuZeshan updated FLINK-35291: -- Description: We are doing performance testing on Flink cdc 3.0 and found through the arthas profile that there is a significant performance bottleneck in the deserialization of row data. The main problem lies in the String. format in the BinaryRecordDataGenerator class, so we have made simple performance optimizations. test environment: * flink: 1.20-SNAPSHOT master * flink-cdc: 3.2-SNAPSHOT master * 1CU minicluster mode {code:java} source: type: mysql hostname: localhost port: 3308 username: root password: 123456 tables: test.user_behavior server-id: 5400-5404 #server-time-zone: UTC scan.startup.mode: earliest-offset debezium.poll.interval.ms: 10 sink: type: values name: Values Sink materialized.in.memory: false print.enabled: false pipeline: name: Sync MySQL Database to Values parallelism: 1{code} *before optimization: 3.5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179! [^cdc-3.0-1c.html] ^Analyzing the flame chart, it can be found that approximately 24.45% of the time is spent on string.format.^ !image-2024-05-06-00-29-34-618.png|width=583,height=171! *after optimization: 5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174! [^cdc-3.0-1c-2.html] After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the time is still spent on org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. Perhaps we can further optimize it. !image-2024-05-06-00-37-16-028.png|width=379,height=107! was: We are doing performance testing on Flink cdc 3.0 and found through the arthas profile that there is a significant performance bottleneck in the serialization of row data. The main problem lies in the String. format in the BinaryRecordDataGenerator class, so we have made simple performance optimizations. test environment: * flink: 1.20-SNAPSHOT master * flink-cdc: 3.2-SNAPSHOT master * 1CU minicluster mode {code:java} source: type: mysql hostname: localhost port: 3308 username: root password: 123456 tables: test.user_behavior server-id: 5400-5404 #server-time-zone: UTC scan.startup.mode: earliest-offset debezium.poll.interval.ms: 10 sink: type: values name: Values Sink materialized.in.memory: false print.enabled: false pipeline: name: Sync MySQL Database to Values parallelism: 1{code} *before optimization: 3.5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179! [^cdc-3.0-1c.html] ^Analyzing the flame chart, it can be found that approximately 24.45% of the time is spent on string.format.^ !image-2024-05-06-00-29-34-618.png|width=583,height=171! *after optimization: 5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174! [^cdc-3.0-1c-2.html] After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the time is still spent on org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. Perhaps we can further optimize it. !image-2024-05-06-00-37-16-028.png|width=379,height=107! > Improve the ROW data deserialization performance of > DebeziumEventDeserializationScheme > -- > > Key: FLINK-35291 > URL: https://issues.apache.org/jira/browse/FLINK-35291 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: 1.20.0 >Reporter: LiuZeshan >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, > image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png > > > We are doing performance testing on Flink cdc 3.0 and found through the > arthas profile that there is a significant performance bottleneck in the > deserialization of row data. The main problem lies in the String. format
[jira] [Updated] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme
[ https://issues.apache.org/jira/browse/FLINK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35291: --- Labels: pull-request-available (was: ) > Improve the ROW data deserialization performance of > DebeziumEventDeserializationScheme > -- > > Key: FLINK-35291 > URL: https://issues.apache.org/jira/browse/FLINK-35291 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: 1.20.0 >Reporter: LiuZeshan >Priority: Minor > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, > image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png > > > We are doing performance testing on Flink cdc 3.0 and found through the > arthas profile that there is a significant performance bottleneck in the > serialization of row data. The main problem lies in the String. format in the > BinaryRecordDataGenerator class, so we have made simple performance > optimizations. > test environment: > * flink: 1.20-SNAPSHOT master > * flink-cdc: 3.2-SNAPSHOT master > * 1CU minicluster mode > {code:java} > source: > type: mysql > hostname: localhost > port: 3308 > username: root > password: 123456 > tables: test.user_behavior > server-id: 5400-5404 > #server-time-zone: UTC > scan.startup.mode: earliest-offset > debezium.poll.interval.ms: 10 > sink: > type: values > name: Values Sink > materialized.in.memory: false > print.enabled: false > pipeline: > name: Sync MySQL Database to Values > parallelism: 1{code} > > *before optimization: 3.5w/s* > !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179! > [^cdc-3.0-1c.html] > ^Analyzing the flame chart, it can be found that approximately 24.45% of the > time is spent on string.format.^ > !image-2024-05-06-00-29-34-618.png|width=583,height=171! > > *after optimization: 5w/s* > !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174! > > [^cdc-3.0-1c-2.html] > After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of > the time is still spent on > org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. > Perhaps we can further optimize it. > !image-2024-05-06-00-37-16-028.png|width=379,height=107! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35291) Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme
LiuZeshan created FLINK-35291: - Summary: Improve the ROW data deserialization performance of DebeziumEventDeserializationScheme Key: FLINK-35291 URL: https://issues.apache.org/jira/browse/FLINK-35291 Project: Flink Issue Type: Improvement Components: Flink CDC Affects Versions: 1.20.0 Reporter: LiuZeshan Fix For: 1.20.0 Attachments: cdc-3.0-1c-2.html, cdc-3.0-1c.html, image-2024-05-06-00-29-34-618.png, image-2024-05-06-00-37-16-028.png We are doing performance testing on Flink cdc 3.0 and found through the arthas profile that there is a significant performance bottleneck in the serialization of row data. The main problem lies in the String. format in the BinaryRecordDataGenerator class, so we have made simple performance optimizations. test environment: * flink: 1.20-SNAPSHOT master * flink-cdc: 3.2-SNAPSHOT master * 1CU minicluster mode {code:java} source: type: mysql hostname: localhost port: 3308 username: root password: 123456 tables: test.user_behavior server-id: 5400-5404 #server-time-zone: UTC scan.startup.mode: earliest-offset debezium.poll.interval.ms: 10 sink: type: values name: Values Sink materialized.in.memory: false print.enabled: false pipeline: name: Sync MySQL Database to Values parallelism: 1{code} *before optimization: 3.5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=MTRjZGIyNWYyYmVlY2YwNDNmYjExZDE4MjRhMGYyYzlfcVRuM0JBYXpTem9qUWRxdkY0NGZmVkpWc1cxMnlzaE9fVG9rZW46RklTbWJUNkVYb2s0WGF4eEttWWN6M0hIbjJTXzE3MTQ5MjU4OTY6MTcxNDkyOTQ5Nl9WNA|width=361,height=179! [^cdc-3.0-1c.html] ^Analyzing the flame chart, it can be found that approximately 24.45% of the time is spent on string.format.^ !image-2024-05-06-00-29-34-618.png|width=583,height=171! *after optimization: 5w/s* !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YjRkMDRmYTkzNzRiNjBmMzVmN2VlYTYyMGRmMGU0ZDRfcFIyNGNGMEViSzRjektpdVFWYTYyUnJQbWJjd1lnb3dfVG9rZW46V2ZXVGJ2T3lDb3dCSmF4WVZvTGMzc2h2bmpmXzE3MTQ5MjU5NTM6MTcxNDkyOTU1M19WNA|width=363,height=174! [^cdc-3.0-1c-2.html] After optimization, 4.7%(extractBeforeDataRecord+extractAfterDataRecord) of the time is still spent on org/apache/flink/cdc/runtime/typeutils/BinaryRecordDataGenerator.. Perhaps we can further optimize it. !image-2024-05-06-00-37-16-028.png|width=379,height=107! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35270]Enrich information in logs, making it easier for debugging [flink]
HCTommy commented on PR #24747: URL: https://github.com/apache/flink/pull/24747#issuecomment-2094854809 Hi, @fredia . I optimized some logs, which I think is helpful for people to debug in production environment. Could you please review in your available time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35180) Instant in row doesn't convert to correct type in python thread mode
[ https://issues.apache.org/jira/browse/FLINK-35180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843573#comment-17843573 ] Wouter Zorgdrager commented on FLINK-35180: --- I have encountered the same bug. The only addition I can make is that in `thread` mode, the object is a `pemja.PyJObject` which wraps `java.time.Instant`. That should be a `pyflink.common.time.Instant`. > Instant in row doesn't convert to correct type in python thread mode > > > Key: FLINK-35180 > URL: https://issues.apache.org/jira/browse/FLINK-35180 > Project: Flink > Issue Type: Bug > Components: API / Python >Reporter: Wei Yuan >Priority: Major > > {code:java} > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types, WatermarkStrategy, Configuration > from pyflink.table import EnvironmentSettings, TableEnvironment > from pyflink.table import StreamTableEnvironment, Schema > from pyflink.datastream.functions import ProcessFunction, MapFunction > # init task env > config = Configuration() > # config.set_string("python.execution-mode", "thread") > config.set_string("python.execution-mode", "process") > config.set_string("python.client.executable", "/root/miniconda3/bin/python3") > config.set_string("python.executable", "/root/miniconda3/bin/python3") > env = StreamExecutionEnvironment.get_execution_environment(config) > table_env = StreamTableEnvironment.create(env) > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", > "content") > table_env.create_temporary_view("test_table", table) > result_table = table_env.sql_query("select *, NOW() as dt from test_table") > result_ds = table_env.to_data_stream(result_table) > def test_func(row): > print(row) > return row > result_ds.map(test_func) > env.execute(){code} > output in process mode: > {code:java} > Row(id=1, content='Hi', dt=Instant<1713609386, 27100>) > Row(id=2, content='Hello', dt=Instant<1713609386, 58000>) {code} > output in thread mode: > {code:java} > > Row(id=1, content='Hi', dt=) > Traceback (most recent call last): > File "/home/disk1/yuanwei/bug.py", line 31, in > env.execute() > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", > line 773, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", > line 1322, in {}call{} > return_value = get_return_value( > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line > 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute. > : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > at
[jira] [Closed] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode
[ https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wouter Zorgdrager closed FLINK-35290. - Resolution: Duplicate > Wrong Instant type conversion TableAPI to Datastream in thread mode > --- > > Key: FLINK-35290 > URL: https://issues.apache.org/jira/browse/FLINK-35290 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Priority: Major > > In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a > Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm > wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` > maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? > Nevertheless, if we switch from `process` to `thread` mode for execution, the > `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a > `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if > I only use the DatastreamAPI and read `Types.Instant()` directly, the > conversion in both `thread` and `process` mode seem to work just fine. > Below a minimal example exposing the bug: > ``` > EXECUTION_MODE = "thread" # or "process" > config = Configuration() > config.set_string("python.execution-mode", EXECUTION_MODE) > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > t_env.get_config().set("parallelism.default", "1") > t_env.get_config().set("python.fn-execution.bundle.size", "1") > t_env.get_config().set("python.execution-mode", EXECUTION_MODE) > def to_epoch_ms(row: Row): > print(type(row[1])) > return row[1].to_epoch_milli() > t_env.to_data_stream( > t_env.from_elements( > [ > (1, datetime(year=2024, day=10, month=9, hour=9)), > (2, datetime(year=2024, day=10, month=9, hour=12)), > (3, datetime(year=2024, day=22, month=11, hour=12)), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), > ] > ), > ) > ).map(to_epoch_ms, output_type=Types.LONG()).print() > env.execute() > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode
[ https://issues.apache.org/jira/browse/FLINK-35290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17843572#comment-17843572 ] Wouter Zorgdrager commented on FLINK-35290: --- It seems this bug has already been reported here https://issues.apache.org/jira/browse/FLINK-35180. Will close this issue. > Wrong Instant type conversion TableAPI to Datastream in thread mode > --- > > Key: FLINK-35290 > URL: https://issues.apache.org/jira/browse/FLINK-35290 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Priority: Major > > In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a > Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm > wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` > maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? > Nevertheless, if we switch from `process` to `thread` mode for execution, the > `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a > `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if > I only use the DatastreamAPI and read `Types.Instant()` directly, the > conversion in both `thread` and `process` mode seem to work just fine. > Below a minimal example exposing the bug: > ``` > EXECUTION_MODE = "thread" # or "process" > config = Configuration() > config.set_string("python.execution-mode", EXECUTION_MODE) > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env) > t_env.get_config().set("parallelism.default", "1") > t_env.get_config().set("python.fn-execution.bundle.size", "1") > t_env.get_config().set("python.execution-mode", EXECUTION_MODE) > def to_epoch_ms(row: Row): > print(type(row[1])) > return row[1].to_epoch_milli() > t_env.to_data_stream( > t_env.from_elements( > [ > (1, datetime(year=2024, day=10, month=9, hour=9)), > (2, datetime(year=2024, day=10, month=9, hour=12)), > (3, datetime(year=2024, day=22, month=11, hour=12)), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("id", DataTypes.INT()), > DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), > ] > ), > ) > ).map(to_epoch_ms, output_type=Types.LONG()).print() > env.execute() > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35290) Wrong Instant type conversion TableAPI to Datastream in thread mode
Wouter Zorgdrager created FLINK-35290: - Summary: Wrong Instant type conversion TableAPI to Datastream in thread mode Key: FLINK-35290 URL: https://issues.apache.org/jira/browse/FLINK-35290 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.18.1 Reporter: Wouter Zorgdrager In PyFlink, if you convert a table with a `TIMESTAMP_LTZ(3)` type into a Datastream, we get an `pyflink.common.time.Instant` type. First of all, I'm wondering if this is expected behavior as in the TableAPI, `TIMESTAMP_LTZ` maps to a Python `datetime`. Can't the same be done for the DatastreamAPI? Nevertheless, if we switch from `process` to `thread` mode for execution, the `TIMESTAMP_LTZ(3)` gets mapped to `pemja.PyJObject' (which wraps a `java.time.Instant`) rather than `pyflink.common.time.Instant`. Note that if I only use the DatastreamAPI and read `Types.Instant()` directly, the conversion in both `thread` and `process` mode seem to work just fine. Below a minimal example exposing the bug: ``` EXECUTION_MODE = "thread" # or "process" config = Configuration() config.set_string("python.execution-mode", EXECUTION_MODE) env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.get_config().set("parallelism.default", "1") t_env.get_config().set("python.fn-execution.bundle.size", "1") t_env.get_config().set("python.execution-mode", EXECUTION_MODE) def to_epoch_ms(row: Row): print(type(row[1])) return row[1].to_epoch_milli() t_env.to_data_stream( t_env.from_elements( [ (1, datetime(year=2024, day=10, month=9, hour=9)), (2, datetime(year=2024, day=10, month=9, hour=12)), (3, datetime(year=2024, day=22, month=11, hour=12)), ], DataTypes.ROW( [ DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP_LTZ(3)), ] ), ) ).map(to_epoch_ms, output_type=Types.LONG()).print() env.execute() ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35112][python] Fix membership for Row class PyFlink [flink]
flinkbot commented on PR #24756: URL: https://github.com/apache/flink/pull/24756#issuecomment-2094773254 ## CI report: * ebca67d26c29e755aef515140ad2aacc6a6e6835 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35112) Membership for Row class does not include field names
[ https://issues.apache.org/jira/browse/FLINK-35112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35112: --- Labels: pull-request-available (was: ) > Membership for Row class does not include field names > - > > Key: FLINK-35112 > URL: https://issues.apache.org/jira/browse/FLINK-35112 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.18.1 >Reporter: Wouter Zorgdrager >Priority: Minor > Labels: pull-request-available > > In the Row class in PyFlink I cannot do a membership check for field names. > This minimal example will show the unexpected behavior: > ``` > from pyflink.common import Row > row = Row(name="Alice", age=11) > # Expected to be True, but is False > print("name" in row) > person = Row("name", "age") > # This is True, as expected > print('name' in person) > ``` > The related code in the Row class is: > ``` > def __contains__(self, item): > return item in self._values > ``` > It should be relatively easy to fix with the following code: > ``` > def __contains__(self, item): > if hasattr(self, "_fields"): > return item in self._fields > else: > return item in self._values > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35112][Python] Fix membership for Row class PyFlink [flink]
wzorgdrager opened a new pull request, #24756: URL: https://github.com/apache/flink/pull/24756 ## What is the purpose of the change This pull request adds support for a membership check for field names in a Row in PyFlink. If field names are not defined, it will check membership in the values. For example: ``` user_row = Row(name="Alice", age=22) print("name" in user_row) # Evaluates to True user_row = Row("Alice", 22) print("Alice" in user_row) # Evaluates to True ``` This change is more in line with how dictionaries behave in Python. ## Brief change log - Change `in` behaviour for `Row` class. - If field names are defined, `KEY in row` will check if `KEY` exists in field names. - If field names do not exist, `KEY in row` will check if `KEY` exists in field values (current behaviour). ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]
Samrat002 commented on PR #114: URL: https://github.com/apache/flink-connector-aws/pull/114#issuecomment-2094737962 > I have left some comments, I will continue the review later. I believe this PR is incomplete right? we still need to add tests. yes , tests were not added , since it will increase the size of PR. As discussed offline , i will reduce the scope of this pr to only Async v2 and move other things to succeeding pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]
Samrat002 commented on code in PR #114: URL: https://github.com/apache/flink-connector-aws/pull/114#discussion_r1590270090 ## flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redshift.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.redshift.executor.RedshiftS3Util; +import org.apache.flink.connector.redshift.mode.SinkMode; +import org.apache.flink.connector.redshift.options.RedshiftOptions; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** Dynamic Table Factory. */ +@PublicEvolving +public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory { +public static final String IDENTIFIER = "redshift"; + +public static final ConfigOption HOSTNAME = +ConfigOptions.key("hostname") +.stringType() +.noDefaultValue() +.withDeprecatedKeys("AWS Redshift cluster hostname."); + +public static final ConfigOption PORT = +ConfigOptions.key("port") +.intType() +.defaultValue(5439) +.withDeprecatedKeys("AWS Redshift port number.\nDefault value : 5439."); + +public static final ConfigOption USERNAME = +ConfigOptions.key("username") +.stringType() +.noDefaultValue() +.withDescription("AWS Redshift Cluster username."); + +public static final ConfigOption PASSWORD = +ConfigOptions.key("password") +.stringType() +.noDefaultValue() +.withDescription("AWS Redshift cluster password."); + +public static final ConfigOption DATABASE_NAME = +ConfigOptions.key("sink.database-name") +.stringType() +.defaultValue("dev") +.withDescription( +"AWS Redshift cluster database name. Default value set to `dev`."); + +public static final ConfigOption TABLE_NAME = +ConfigOptions.key("sink.table-name") +.stringType() +.noDefaultValue() +.withDescription("AWS Redshift cluster sink table name."); + +public static final ConfigOption SINK_BATCH_SIZE = +ConfigOptions.key("sink.batch-size") +.intType() +.defaultValue(1000) +.withDescription( +"`sink.batch-size` determines the maximum size of batch, in terms of the number of records, " ++ "at which data will trigger a flush operation." ++ " When the number of records exceeds this threshold, the system initiates a flush to manage the data.\n" ++ "Default Value: 1000"); + +public static final ConfigOption SINK_FLUSH_INTERVAL = +ConfigOptions.key("sink.flush-interval") +.durationType() +.defaultValue(Duration.ofSeconds(1L)) +.withDescription( +"the flush interval mills, over this time, asynchronous threads will flush data. The default value is 1s."); + +public static final ConfigOption SINK_MAX_RETRIES = +ConfigOptions.key("sink.max-retries") +.intType() +.defaultValue(3) +
Re: [PR] [FLINK-33132] Flink Connector Redshift TableSink Implementation [flink-connector-aws]
Samrat002 commented on code in PR #114: URL: https://github.com/apache/flink-connector-aws/pull/114#discussion_r1590269554 ## flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.redshift.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.redshift.executor.RedshiftS3Util; +import org.apache.flink.connector.redshift.mode.SinkMode; +import org.apache.flink.connector.redshift.options.RedshiftOptions; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** Dynamic Table Factory. */ +@PublicEvolving +public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory { +public static final String IDENTIFIER = "redshift"; + +public static final ConfigOption HOSTNAME = +ConfigOptions.key("hostname") +.stringType() +.noDefaultValue() +.withDeprecatedKeys("AWS Redshift cluster hostname."); + +public static final ConfigOption PORT = +ConfigOptions.key("port") +.intType() +.defaultValue(5439) +.withDeprecatedKeys("AWS Redshift port number.\nDefault value : 5439."); + +public static final ConfigOption USERNAME = +ConfigOptions.key("username") +.stringType() +.noDefaultValue() +.withDescription("AWS Redshift Cluster username."); + +public static final ConfigOption PASSWORD = +ConfigOptions.key("password") +.stringType() +.noDefaultValue() +.withDescription("AWS Redshift cluster password."); + +public static final ConfigOption DATABASE_NAME = +ConfigOptions.key("sink.database-name") +.stringType() +.defaultValue("dev") Review Comment: `dev` is the default database name created by redshift. i assumed if user dont provide database name as a config then it should assume database name as default one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35240][Connectors][format]Disable FLUSH_AFTER_WRITE_VALUE to avoid flush per record for csv format [flink]
GOODBOY008 commented on PR #24730: URL: https://github.com/apache/flink/pull/24730#issuecomment-2094663611 @afedulov Would you help review this pr? Thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org