[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-35232: Description: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] Basically the proposal is to be able to tune the timeout via multiplier, maxAttemts + totalTimeout mechanisms. All of the config options should be optional and the default one should be used in case some of configs are not provided. was: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] Basically the proposal is to be able to tune the timeout via various mechanism. All of the config options should be optional and the default one should be used in case some of configs are not provided. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy
[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-35232: Description: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] Basically the proposal is to be able to tune the timeout via various mechanism. All of the config options should be optional and the default one should be used in case some of configs are not provided. was: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] All of the config options should be optional and the default one should be used in case some of configs are not provided. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.ga
[jira] [Commented] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17841267#comment-17841267 ] Oleksandr Nitavskyi commented on FLINK-35232: - [~galenwarren] thanks. We have reduced the amount of methods to the bare minimum. Reflected in the description: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] Thus Flink user will be able to adjust the total timeout time to the checkpoint timeout, so job does it best before it gave up to commit the data. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] > * > [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] > > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-35232: Description: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] * [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] * [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] * [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxRpcTimeout__] * [totalTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getTotalTimeout__] All of the config options should be optional and the default one should be used in case some of configs are not provided. was: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * maxAttempts * initialRpcTimeout * rpcTimeoutMultiplier * maxRpcTimeout * totalTimeout All of the config options should be optional and the default one should be used in case some of configs are not provided. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * > [maxAttempts|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getMaxAttempts__] > * > [initialRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getInitialRpcTimeout__] > * > [rpcTimeoutMultiplier|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#com_google_api_gax_retrying_RetrySettings_getRpcTimeoutMultiplier__] > * > [maxRpcTimeout|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax
[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-35232: Description: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: * maxAttempts * initialRpcTimeout * rpcTimeoutMultiplier * maxRpcTimeout * totalTimeout All of the config options should be optional and the default one should be used in case some of configs are not provided. was: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: ``` maxAttempts initialRpcTimeout rpcTimeoutMultiplier maxRpcTimeout totalTimeout ``` All of the config options should be optional and the default one should be used in case some of configs are not provided. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > * maxAttempts > * initialRpcTimeout > * rpcTimeoutMultiplier > * maxRpcTimeout > * totalTimeout > > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35232) Support for retry settings on GCS connector
[ https://issues.apache.org/jira/browse/FLINK-35232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-35232: Description: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We need [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. In particular next settings seems to be the minimum required to adjust GCS timeout with Job's checkpoint config: ``` maxAttempts initialRpcTimeout rpcTimeoutMultiplier maxRpcTimeout totalTimeout ``` All of the config options should be optional and the default one should be used in case some of configs are not provided. was: https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to specify transport options in GCS connector. While setting the params enabled here reduced read timeouts, we still see 503 errors leading to Flink job restarts. Thus, in this ticket, we want to specify additional retry settings as noted in [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] We want [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] methods available for Flink users so that they can customize their deployment. > Support for retry settings on GCS connector > --- > > Key: FLINK-35232 > URL: https://issues.apache.org/jira/browse/FLINK-35232 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.15.3, 1.16.2, 1.17.1, 1.19.0, 1.18.1 >Reporter: Vikas M >Assignee: Ravi Singh >Priority: Major > > https://issues.apache.org/jira/browse/FLINK-32877 is tracking ability to > specify transport options in GCS connector. While setting the params enabled > here reduced read timeouts, we still see 503 errors leading to Flink job > restarts. > Thus, in this ticket, we want to specify additional retry settings as noted > in > [https://cloud.google.com/storage/docs/retry-strategy#customize-retries.|https://cloud.google.com/storage/docs/retry-strategy#customize-retries] > We need > [these|https://cloud.google.com/java/docs/reference/gax/latest/com.google.api.gax.retrying.RetrySettings#methods] > methods available for Flink users so that they can customize their > deployment. In particular next settings seems to be the minimum required to > adjust GCS timeout with Job's checkpoint config: > ``` > maxAttempts > initialRpcTimeout > rpcTimeoutMultiplier > maxRpcTimeout > totalTimeout > ``` > All of the config options should be optional and the default one should be > used in case some of configs are not provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35210) Give the option to set automatically the parallelism of the KafkaSource to the number of kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-35210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839739#comment-17839739 ] Oleksandr Nitavskyi commented on FLINK-35210: - Thanks [~npfp] for suggestion. I believe what you proposed is often resolve with some wrapper around KafkaSource, which could be a layer of indirection to do a lot of things, e.g. parallelism config. Meanwhile could you please elaborate how could bad parallelism lead to the Idle tasks? Do you mean the case where Source parallelism is lower than the amount of partitions and thus you have Source which consumes nothing and thus you have no watermark advancement unless [Idleness|https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources] is not configured. > Give the option to set automatically the parallelism of the KafkaSource to > the number of kafka partitions > - > > Key: FLINK-35210 > URL: https://issues.apache.org/jira/browse/FLINK-35210 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Nicolas Perrin >Priority: Minor > > Currently the setting of the `KafkaSource` Flink's operator parallelism needs > to be manually chosen which can leads to highly skewed tasks if the developer > doesn't do this job. > To avoid this issue, I propose to: > - retrieve dynamically the number of partitions of the topic using > `KafkaConsumer. > partitionsFor(topic).size()`, > - set the parallelism of the stream built from the source based on this value. > This way there won't be any idle tasks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33376) Extend Curator config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-33376: Component/s: Runtime / Coordination > Extend Curator config option for Zookeeper configuration > > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33376) Extend Curator config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-33376: Summary: Extend Curator config option for Zookeeper configuration (was: Add AuthInfo config option for Zookeeper configuration) > Extend Curator config option for Zookeeper configuration > > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783220#comment-17783220 ] Oleksandr Nitavskyi commented on FLINK-33376: - Thanks for the detailed listing. It sounds reasonable to add configuration in public Flink interface for only those options: * [authorization|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#authorization(java.lang.String,byte%5B%5D)] * [maxCloseWaitMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#maxCloseWaitMs(int)] * [simulatedSessionExpirationPercent|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)] It also make sense to not touch the [compressionProvider|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#compressionProvider(org.apache.curator.framework.api.CompressionProvider)] since Flink doesn't store any BLOB data in zookeeper. In general it make sense to expose only connection related configuration (auth/timeouts) since it depends on the Flink's user environment. Once we have aligned on the set of the options and the fact that we basically would add 3 missing options, we can start the documentation process. Am I right? > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780489#comment-17780489 ] Oleksandr Nitavskyi commented on FLINK-33376: - It would be really good to be able to support something generic enough to translate Flink configuration into Curator config, e.g. like in [hadoop config|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#flink-hadoop-%3Ckey%3E]. But since Curator uses the Builder pattern I do not see how can we make it generic enough. Probably as compromise it would be sane to consider to add support for all missing Curator configurations. If we go this way here is the list of configurations which Flink doesn't configure at all for now: * [authorization|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#authorization(java.lang.String,byte%5B%5D)] * [canBeReadOnly|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#canBeReadOnly(boolean)] * [compressionProvider|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#compressionProvider(org.apache.curator.framework.api.CompressionProvider)] * [defaultData|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#defaultData(byte%5B%5D)] * [dontUseContainerParents|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#dontUseContainerParents()]/[useContainerParentsIfAvailable|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#useContainerParentsIfAvailable()] * [maxCloseWaitMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#maxCloseWaitMs(int)] * [namespace|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#namespace(java.lang.String)] * [runSafeService|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#runSafeService(java.util.concurrent.Executor)] * [schemaSet|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#schemaSet(org.apache.curator.framework.schema.SchemaSet)] * [simulatedSessionExpirationPercent|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#simulatedSessionExpirationPercent(int)] * [waitForShutdownTimeoutMs|https://curator.apache.org/apidocs/org/apache/curator/framework/CuratorFrameworkFactory.Builder.html#waitForShutdownTimeoutMs(int)] > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780387#comment-17780387 ] Oleksandr Nitavskyi commented on FLINK-33376: - Thanks for the link. If we really require a Flip, I think it would be nice to anticipate a bit more parameters for Curator framework. > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780387#comment-17780387 ] Oleksandr Nitavskyi edited comment on FLINK-33376 at 10/27/23 1:56 PM: --- Thanks for the link. If we really require a Flip, I think it would be nice to anticipate a bit more generic way to configure the Curator framework. was (Author: oleksandr nitavskyi): Thanks for the link. If we really require a Flip, I think it would be nice to anticipate a bit more parameters for Curator framework. > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17780373#comment-17780373 ] Oleksandr Nitavskyi commented on FLINK-33376: - [~mapohl] sure, would be glad to drive this! Sorry, didn't know that we need to start a dev list discussion (didn't make it for several config options in the past). Will drop an email to ensure visibility with the community. > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
[ https://issues.apache.org/jira/browse/FLINK-33376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779947#comment-17779947 ] Oleksandr Nitavskyi commented on FLINK-33376: - For implementation we could add an additional Map config option and Flink users will be able to pass AuthInfo. There is some miss-alignment, AuthInfo type is while Map is . As simplest workaround we get accept on Flink config interface and use _getBytes()_ method in order to adapt interfaces. > Add AuthInfo config option for Zookeeper configuration > -- > > Key: FLINK-33376 > URL: https://issues.apache.org/jira/browse/FLINK-33376 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Priority: Major > > In certain cases ZooKeeper requires additional Authentication information. > For example list of valid [names for > ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] > in order to prevent the accidental connecting to a wrong ensemble. > Curator allows to add additional AuthInfo object for such configuration. Thus > it would be useful to add one more additional Map property which would allow > to pass AuthInfo objects during Curator client creation. > *Acceptance Criteria:* For Flink users it is possible to configure auth info > list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33376) Add AuthInfo config option for Zookeeper configuration
Oleksandr Nitavskyi created FLINK-33376: --- Summary: Add AuthInfo config option for Zookeeper configuration Key: FLINK-33376 URL: https://issues.apache.org/jira/browse/FLINK-33376 Project: Flink Issue Type: Improvement Reporter: Oleksandr Nitavskyi In certain cases ZooKeeper requires additional Authentication information. For example list of valid [names for ensemble|https://zookeeper.apache.org/doc/r3.8.0/zookeeperAdmin.html#:~:text=for%20secure%20authentication.-,zookeeper.ensembleAuthName,-%3A%20(Java%20system%20property] in order to prevent the accidental connecting to a wrong ensemble. Curator allows to add additional AuthInfo object for such configuration. Thus it would be useful to add one more additional Map property which would allow to pass AuthInfo objects during Curator client creation. *Acceptance Criteria:* For Flink users it is possible to configure auth info list for Curator framework client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Priority: Minor (was: Major) > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Minor > Labels: pull-request-available > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple Log4jThreads > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726626#comment-17726626 ] Oleksandr Nitavskyi commented on FLINK-32203: - [~chesnay] thanks for looking into PR (https://github.com/apache/flink/pull/22664). You can see in attach an example of the stack trace, which we get when Log4jThread is being created. We have run the job and were killing one of JobManager to rely on HA and trigger the job restart. During debug of the Log4jThread creation we saw that in StackTrace there are Presto (for checkpoint) or Hadoop S3A (to write output on S3) FileSystems, which are loaded from Plugin Classloader. (example stack trace is attached) Do you know if a plugin Classloader instance is created per job, when a job is being created? If yes, probably this instance is being passed to Log4jContextFactory and thus a new Log4j subsystem being created. > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple Log4jThreads > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Description: *Context* We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, I believe it could be applied to log4j config by default. *How to reproduce* Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> in Task Manager dump you should see multiple *AC* We have a configuration which doesn't lead easy to memory leak with default configuration for Flink users. was: *Context* We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, I believe it could be applied to log4j config by default. *AC* We have a configuration which doesn't lead easy to memory leak with default configuration for Flink users. > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Description: *Context* We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, I believe it could be applied to log4j config by default. *How to reproduce* Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> in Task Manager dump you should see multiple Log4jThreads *AC* We have a configuration which doesn't lead easy to memory leak with default configuration for Flink users. was: *Context* We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, I believe it could be applied to log4j config by default. *How to reproduce* Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job -> in Task Manager dump you should see multiple *AC* We have a configuration which doesn't lead easy to memory leak with default configuration for Flink users. > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *How to reproduce* > Deploy Flink job, which uses Hadoop File System (e.g. s3a). Redeploy the job > -> in Task Manager dump you should see multiple Log4jThreads > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Attachment: stack_trace_example_with_log4j_creation_on_job_reload.log > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: classloader_leak.png, > stack_trace_example_with_log4j_creation_on_job_reload.log > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Attachment: classloader_leak.png > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: classloader_leak.png > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
[ https://issues.apache.org/jira/browse/FLINK-32203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-32203: Description: *Context* We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. Removal monitorInterval introduced in FLINK-20510 helps to mitigate the issue, I believe it could be applied to log4j config by default. *AC* We have a configuration which doesn't lead easy to memory leak with default configuration for Flink users. > Potential ClassLoader memory leak due to log4j configuration > > > Key: FLINK-32203 > URL: https://issues.apache.org/jira/browse/FLINK-32203 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: classloader_leak.png > > > *Context* > We have encountered a memory leak related to ClassLoaders in Apache Flink. > ChildFirstClassLoader is not properly garbage collected, when job is being > restarted. > Heap Dump has shown that Log4j starts a configuration watch thread, which > then has Strong reference to ChildFirstClassLoader via AccessControlContext. > Since thread is never stopped, ChildFirstClassLoader is never cleaned. > Removal monitorInterval introduced in FLINK-20510 helps to mitigate the > issue, I believe it could be applied to log4j config by default. > *AC* > We have a configuration which doesn't lead easy to memory leak with default > configuration for Flink users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32203) Potential ClassLoader memory leak due to log4j configuration
Oleksandr Nitavskyi created FLINK-32203: --- Summary: Potential ClassLoader memory leak due to log4j configuration Key: FLINK-32203 URL: https://issues.apache.org/jira/browse/FLINK-32203 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-31780: Description: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. In case ZooKeeper is hidden under VIP it can return URL during Ensemble Tracking, which would lead to Unresolved Host Exception inside Curator Framework. On Flink level it would lead to cluster restart. Currently HA with ZooKeeper can even lead to the JobManager failure. The scenario of the failure is next: # Flink connects to ZooKeeper via configured URL. # Ensemble tracking gets a new URL of ensemble, which is not obligatory accessible for Flink, because Zookeeper is under VIP. # In case of reconnect Flink fails to Zookeeper, moreover due to "UnresolvedHostException" Flink's jobManager is killed. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. was: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. In case ZooKeeper is hidden under VIP it can return URL during Ensemble Tracking, which would lead to Unresolved Host Exception inside Curator Framework. On Flink level it would lead to cluster restart. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. > Allow users to disable "Ensemble tracking" for ZooKeeper > > > Key: FLINK-31780 > URL: https://issues.apache.org/jira/browse/FLINK-31780 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > > In Apache Curator an option to skip ensemble tracking was added since version > 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) > This can be useful in certain scenarios in which CuratorFramework is > accessing to ZK clusters via load balancer or Virtual IPs. > Thus in case Zookeeper of Flink user is running behind LB or Virtual IP > ensemble tracking could be disabled too. > In case ZooKeeper is hidden under VIP it can return URL during Ensemble > Tracking, which would lead to Unresolved Host Exception inside Curator > Framework. On Flink level it would lead to cluster restart. > Currently HA with ZooKeeper can even lead to the JobManager failure. The > scenario of the failure is next: > # Flink connects to ZooKeeper via configured URL. > # Ensemble tracking gets a new URL of ensemble, which is not obligatory > accessible for Flink, because Zookeeper is under VIP. > # In case of reconnect Flink fails to Zookeeper, moreover due to > "UnresolvedHostException" Flink's jobManager is killed. > *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to > disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-31780: Description: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. In case ZooKeeper is hidden under VIP it can return URL during Ensemble Tracking, which would lead to Unresolved Host Exception inside Curator Framework. On Flink level it would lead to cluster restart. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. was: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. > Allow users to disable "Ensemble tracking" for ZooKeeper > > > Key: FLINK-31780 > URL: https://issues.apache.org/jira/browse/FLINK-31780 > Project: Flink > Issue Type: Improvement >Reporter: Oleksandr Nitavskyi >Priority: Major > Labels: pull-request-available > > In Apache Curator an option to skip ensemble tracking was added since version > 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) > This can be useful in certain scenarios in which CuratorFramework is > accessing to ZK clusters via load balancer or Virtual IPs. > Thus in case Zookeeper of Flink user is running behind LB or Virtual IP > ensemble tracking could be disabled too. > In case ZooKeeper is hidden under VIP it can return URL during Ensemble > Tracking, which would lead to Unresolved Host Exception inside Curator > Framework. On Flink level it would lead to cluster restart. > *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to > disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-31780: Description: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. was: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster failure, even if only single node is not available, see fix in Apache Curator: https://github.com/apache/curator/pull/433. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. > Allow users to disable "Ensemble tracking" for ZooKeeper > > > Key: FLINK-31780 > URL: https://issues.apache.org/jira/browse/FLINK-31780 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > > In Apache Curator an option to skip ensemble tracking was added since version > 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) > This can be useful in certain scenarios in which CuratorFramework is > accessing to ZK clusters via load balancer or Virtual IPs. > Thus in case Zookeeper of Flink user is running behind LB or Virtual IP > ensemble tracking could be disabled too. > *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to > disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-31780: Description: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster failure, even if only single node is not available, see fix in Apache Curator: https://github.com/apache/curator/pull/433. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. was: In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. > Allow users to disable "Ensemble tracking" for ZooKeeper > > > Key: FLINK-31780 > URL: https://issues.apache.org/jira/browse/FLINK-31780 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > > In Apache Curator an option to skip ensemble tracking was added since version > 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) > This can be useful in certain scenarios in which CuratorFramework is > accessing to ZK clusters via load balancer or Virtual IPs. > Thus in case Zookeeper of Flink user is running behind LB or Virtual IP > ensemble tracking could be disabled too. > Moreover enabled ensemble tracking could lead to NPE and thus Flink cluster > failure, even if only single node is not available, see fix in Apache > Curator: https://github.com/apache/curator/pull/433. > *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to > disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-31780) Allow users to enable Ensemble tracking for ZooKeeper
Oleksandr Nitavskyi created FLINK-31780: --- Summary: Allow users to enable Ensemble tracking for ZooKeeper Key: FLINK-31780 URL: https://issues.apache.org/jira/browse/FLINK-31780 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi In Apache Curator an option to skip ensemble tracking was added since version 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) This can be useful in certain scenarios in which CuratorFramework is accessing to ZK clusters via load balancer or Virtual IPs. Thus in case Zookeeper of Flink user is running behind LB or Virtual IP ensemble tracking could be disabled too. *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31780) Allow users to disable "Ensemble tracking" for ZooKeeper
[ https://issues.apache.org/jira/browse/FLINK-31780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-31780: Summary: Allow users to disable "Ensemble tracking" for ZooKeeper (was: Allow users to enable Ensemble tracking for ZooKeeper) > Allow users to disable "Ensemble tracking" for ZooKeeper > > > Key: FLINK-31780 > URL: https://issues.apache.org/jira/browse/FLINK-31780 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > > In Apache Curator an option to skip ensemble tracking was added since version > 5.0.0 ([CURATOR-568|https://issues.apache.org/jira/browse/CURATOR-568]) > This can be useful in certain scenarios in which CuratorFramework is > accessing to ZK clusters via load balancer or Virtual IPs. > Thus in case Zookeeper of Flink user is running behind LB or Virtual IP > ensemble tracking could be disabled too. > *Acceptance Criteria:* Users of Apache Flink has a Zookeeper config option to > disable ensemble tracking for ZooKeeper. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]
[ https://issues.apache.org/jira/browse/FLINK-29242 ] Oleksandr Nitavskyi deleted comment on FLINK-29242: - was (Author: oleksandr nitavskyi): While it seems that Flink can do something to be more resilient for such type of errors, here is a link to probably related issue in Google Cloud tracker: https://issuetracker.google.com/issues/191071342?pli=1 > Read time out when close write channel [flink-gs-fs-hadoop ] > > > Key: FLINK-29242 > URL: https://issues.apache.org/jira/browse/FLINK-29242 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.15.0 > Environment: flink version: 1.15 > jdk: 1.8 > >Reporter: Jian Zheng >Priority: Major > > h2. Detail > See in GSBlobStorageImpl > {code:java} > @Override > public int write(byte[] content, int start, int length) throws IOException { > LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier); > Preconditions.checkNotNull(content); > Preconditions.checkArgument(start >= 0); > Preconditions.checkArgument(length >= 0); > ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length); > int written = writeChannel.write(byteBuffer); > LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier); > return written; > } > @Override > public void close() throws IOException { > LOGGER.trace("Closing write channel to blob {}", blobIdentifier); > writeChannel.close(); > } {code} > when I write data into google cloud storage by flink-gs-fs-haddoop. > The service always has read time out exceptions, which can be reproduced in a > very short time of task execution. > I tried to trace the code and found that it always occurs when the > writeChannel Close code is executed. I tried retrying by modifying the source > code but it didn't solve the problem, the timeout is 20s and the checkpoint > will fail if this problem occurs. > I tried to change the chunk size but found no help, with this component, I > can't write data to gcs via flink. > > By the way, I found that 503 service unavailable occurs when create > writeChannel. This problem occurs less often than Read time out, but it needs > to be checked > {code:java} > @Override > public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) { > LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier); > Preconditions.checkNotNull(blobIdentifier); > BlobInfo blobInfo = > BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); > com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); > return new WriteChannel(blobIdentifier, writeChannel); > } > @Override > public GSBlobStorage.WriteChannel writeBlob( > GSBlobIdentifier blobIdentifier, MemorySize chunkSize) { > LOGGER.trace( > "Creating writeable blob for identifier {} with chunk size {}", > blobIdentifier, > chunkSize); > Preconditions.checkNotNull(blobIdentifier); > Preconditions.checkArgument(chunkSize.getBytes() > 0); > BlobInfo blobInfo = > BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); > com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); > writeChannel.setChunkSize((int) chunkSize.getBytes()); > return new WriteChannel(blobIdentifier, writeChannel); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29242) Read time out when close write channel [flink-gs-fs-hadoop ]
[ https://issues.apache.org/jira/browse/FLINK-29242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706863#comment-17706863 ] Oleksandr Nitavskyi commented on FLINK-29242: - While it seems that Flink can do something to be more resilient for such type of errors, here is a link to probably related issue in Google Cloud tracker: https://issuetracker.google.com/issues/191071342?pli=1 > Read time out when close write channel [flink-gs-fs-hadoop ] > > > Key: FLINK-29242 > URL: https://issues.apache.org/jira/browse/FLINK-29242 > Project: Flink > Issue Type: Bug > Components: FileSystems >Affects Versions: 1.15.0 > Environment: flink version: 1.15 > jdk: 1.8 > >Reporter: Jian Zheng >Priority: Major > > h2. Detail > See in GSBlobStorageImpl > {code:java} > @Override > public int write(byte[] content, int start, int length) throws IOException { > LOGGER.trace("Writing {} bytes to blob {}", length, blobIdentifier); > Preconditions.checkNotNull(content); > Preconditions.checkArgument(start >= 0); > Preconditions.checkArgument(length >= 0); > ByteBuffer byteBuffer = ByteBuffer.wrap(content, start, length); > int written = writeChannel.write(byteBuffer); > LOGGER.trace("Wrote {} bytes to blob {}", written, blobIdentifier); > return written; > } > @Override > public void close() throws IOException { > LOGGER.trace("Closing write channel to blob {}", blobIdentifier); > writeChannel.close(); > } {code} > when I write data into google cloud storage by flink-gs-fs-haddoop. > The service always has read time out exceptions, which can be reproduced in a > very short time of task execution. > I tried to trace the code and found that it always occurs when the > writeChannel Close code is executed. I tried retrying by modifying the source > code but it didn't solve the problem, the timeout is 20s and the checkpoint > will fail if this problem occurs. > I tried to change the chunk size but found no help, with this component, I > can't write data to gcs via flink. > > By the way, I found that 503 service unavailable occurs when create > writeChannel. This problem occurs less often than Read time out, but it needs > to be checked > {code:java} > @Override > public GSBlobStorage.WriteChannel writeBlob(GSBlobIdentifier blobIdentifier) { > LOGGER.trace("Creating writeable blob for identifier {}", blobIdentifier); > Preconditions.checkNotNull(blobIdentifier); > BlobInfo blobInfo = > BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); > com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); > return new WriteChannel(blobIdentifier, writeChannel); > } > @Override > public GSBlobStorage.WriteChannel writeBlob( > GSBlobIdentifier blobIdentifier, MemorySize chunkSize) { > LOGGER.trace( > "Creating writeable blob for identifier {} with chunk size {}", > blobIdentifier, > chunkSize); > Preconditions.checkNotNull(blobIdentifier); > Preconditions.checkArgument(chunkSize.getBytes() > 0); > BlobInfo blobInfo = > BlobInfo.newBuilder(blobIdentifier.getBlobId()).build(); > com.google.cloud.WriteChannel writeChannel = storage.writer(blobInfo); > writeChannel.setChunkSize((int) chunkSize.getBytes()); > return new WriteChannel(blobIdentifier, writeChannel); > } {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17443767#comment-17443767 ] Oleksandr Nitavskyi commented on FLINK-17505: - Another workaround for this problem is writing data with Iceberg and consolidating later with Iceberg capabilities. It is quite easy to set up since Flink supports Iceberg output. Also, it seems to be an industry-proof setup. Probably this ticket can be closed > Merge small files produced by StreamingFileSink > --- > > Key: FLINK-17505 > URL: https://issues.apache.org/jira/browse/FLINK-17505 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > This an alternative approach to FLINK-11499, to solve a problem of creating > many small files with bulk formats in StreamingFileSink (which have to be > rolled on checkpoint). > Merge based approach would require converting {{StreamingFileSink}} from a > sink, to an operator, that would be working exactly as it’s working right > now, with the same limitations (no support for arbitrary rolling policies for > bulk formats), followed by another operator that would be tasked with merging > small files in the background. > In the long term we probably would like to have both merge operator and write > ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL > would behave better if small files are more common, and merge operator could > behave better if small files are rare (because of data skew for example). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138437#comment-17138437 ] Oleksandr Nitavskyi commented on FLINK-17505: - 8. And for sure open question is about the integration points to the existing code. Should it be totally separated in the separate Flink Operator or do we want to keep this as a separate Bucket type for example. > Merge small files produced by StreamingFileSink > --- > > Key: FLINK-17505 > URL: https://issues.apache.org/jira/browse/FLINK-17505 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Priority: Major > > This an alternative approach to FLINK-11499, to solve a problem of creating > many small files with bulk formats in StreamingFileSink (which have to be > rolled on checkpoint). > Merge based approach would require converting {{StreamingFileSink}} from a > sink, to an operator, that would be working exactly as it’s working right > now, with the same limitations (no support for arbitrary rolling policies for > bulk formats), followed by another operator that would be tasked with merging > small files in the background. > In the long term we probably would like to have both merge operator and write > ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL > would behave better if small files are more common, and merge operator could > behave better if small files are rare (because of data skew for example). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17505) Merge small files produced by StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136731#comment-17136731 ] Oleksandr Nitavskyi edited comment on FLINK-17505 at 6/17/20, 10:02 AM: MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Since currently, any consumers of the files produced by Flink should filter files without suffixes(which neither .in-progress no .pending). Probably we want to move this logic on the Flink side. *Problems:* 1. When to start file consolidation? Ideally, we want to perform merge iteration once the files were renamed from pending. Which is performed once the checkpoint is done or upon the recovery. But it is not obvious how reliably react to such events in another operator. So we probably want to merge files periodically on some timer with some configurable period (probably similar to the checkpoint interval). As an alternative, we can merge files instead of committing from pending state. But it would require changes inside the Bucket class 2. When files should actually be merged? There are at least two cases when files should be merged together and moved to the final directory: * The desirable size of the input files is achieved. * The bucket is closed. E.g. in case of time series export we probably should be able to compare time associated with a bucket and current watermark (if any). So it should be decided by bucketAssigner and bucketContext. 3. When files should be moved? * Once they achieve desired file size (or when they were actually merged by achieving the desirable input files size) * When the bucket is actually closed. E.g. it is a time series bucket and BucketAssigner with a bucket context can suppose that the bucket is closed. More detailed thoughts about Meta-Info has been precise in this technical doc https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3 * When the bucket is not yet closed (or never will be), but certain inactivity time has passed. 4. How to handle the failures? MergeOperator should perform merging and then move merged files to the final directories. Since this operation cannot be made atomically and mutates the state on FS we should ensure idempotence of the merge/move/source removal operation. For this, we can store some state describing the mutation plan of the input files. We can use Flink State for this or persist the transaction plan on output FS. 5. How to share files from different slots for merging? We probably want to keep the same parallelism as FileStreamingSink. And MergeOperators should consider only the files produced by the Sink from the same slot. In this case on bucket closing if we want to keep the optimal output size we should make another consolidation strategy. So in order to keep efficiency, we want to perform merge operations in parallel. 6. How to discover files which should be merged? Such files are known by Bucket class. A possible solution is to forward all newly created filenames to the MergeOperator. Another solution is simply to list open buckets periodically. In case we have high parallelism we risk creating unnecessary load on the underlying file system. So for this operation, we would prefer to have a parallelism = 1. 7. Should we split files if they are too big? Probably the problem of the big files should be addressed by the proper Checkpoint Policy. was (Author: oleksandr nitavskyi): MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Sinc
[jira] [Commented] (FLINK-17505) Merge small files produced by StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136731#comment-17136731 ] Oleksandr Nitavskyi commented on FLINK-17505: - MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Since currently, any consumers of the files produced by Flink should filter files without suffixes(which neither .in-progress no .pending). Probably we want to move this logic on the Flink side. Problems: 1. When to start file consolidation? Ideally, we want to perform merge iteration once the files were renamed from pending. Which is performed once the checkpoint is done or upon the recovery. But it is not obvious how reliably react to such events in another operator. So we probably want to merge files periodically on some timer with some configurable period (probably similar to the checkpoint interval). 2. When files should actually be merged? There are at least two cases when files should be merged together and moved to the final directory: 1. The desirable size of the input files is achieved. 2. The bucket is closed. E.g. in case of time series export we probably should be able to compare time associated with a bucket and current watermark (if any). So it should be decided by bucketAssigner and bucketContext. 3. When files should be moved? * Once they achieve desired file size (or when they were actually merged by achieving the desirable input files size) * When the bucket is actually closed. E.g. it is a time series bucket and BucketAssigner with a bucket context can suppose that the bucket is closed. More detailed thoughts about Meta-Info has been precise in this technical doc https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3 * When the bucket is not yet closed (or never will be), but certain inactivity time has passed. 4. How to handle the failures? MergeOperator should perform merging and then move merged files to the final directories. Since this operation cannot be made atomically and mutates the state on FS we should ensure idempotence of the merge/move/source removal operation. For this, we can store some state describing the mutation plan of the input files. We can use Flink State for this or persist the transaction plan on output FS. 5. How to share files from different slots for merging? We probably want to keep the same parallelism as FileStreamingSink. And MergeOperators should consider only the files produced by the Sink from the same slot. In this case on bucket closing if we want to keep the optimal output size we should make another consolidation strategy. So in order to keep efficiency, we want to perform merge operations in parallel. 6. How to discover files which should be merged? Such files are known by Bucket class. A possible solution is to forward all newly created filenames to the MergeOperator. Another solution is simply to list open buckets periodically. In case we have high parallelism we risk creating unnecessary load on the underlying file system. So for this operation, we would prefer to have a parallelism = 1. 7. Should we split files if they are too big? Probably the problem of the big files should be addressed by the proper Checkpoint Policy. > Merge small files produced by StreamingFileSink > --- > > Key: FLINK-17505 > URL: https://issues.apache.org/jira/browse/FLINK-17505 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Priority: Major > > This an alternative approach to FLINK-11499, to solve a problem of creating > many small files with bulk formats in StreamingFileSink (which have to be > rolled on checkpoint). > Merge based approach would require converting {{StreamingFileSink}} from a > sink, to an operator, that would be working exactly as it’s working right > now, with the same limitations (no support for arbitrary rolling policies for > bulk formats), followed by another operator that would be tasked with merging > small files in the background. > In th
[jira] [Comment Edited] (FLINK-17505) Merge small files produced by StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136731#comment-17136731 ] Oleksandr Nitavskyi edited comment on FLINK-17505 at 6/16/20, 3:19 PM: --- MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Since currently, any consumers of the files produced by Flink should filter files without suffixes(which neither .in-progress no .pending). Probably we want to move this logic on the Flink side. *Problems:* 1. When to start file consolidation? Ideally, we want to perform merge iteration once the files were renamed from pending. Which is performed once the checkpoint is done or upon the recovery. But it is not obvious how reliably react to such events in another operator. So we probably want to merge files periodically on some timer with some configurable period (probably similar to the checkpoint interval). 2. When files should actually be merged? There are at least two cases when files should be merged together and moved to the final directory: * The desirable size of the input files is achieved. * The bucket is closed. E.g. in case of time series export we probably should be able to compare time associated with a bucket and current watermark (if any). So it should be decided by bucketAssigner and bucketContext. 3. When files should be moved? * Once they achieve desired file size (or when they were actually merged by achieving the desirable input files size) * When the bucket is actually closed. E.g. it is a time series bucket and BucketAssigner with a bucket context can suppose that the bucket is closed. More detailed thoughts about Meta-Info has been precise in this technical doc https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3 * When the bucket is not yet closed (or never will be), but certain inactivity time has passed. 4. How to handle the failures? MergeOperator should perform merging and then move merged files to the final directories. Since this operation cannot be made atomically and mutates the state on FS we should ensure idempotence of the merge/move/source removal operation. For this, we can store some state describing the mutation plan of the input files. We can use Flink State for this or persist the transaction plan on output FS. 5. How to share files from different slots for merging? We probably want to keep the same parallelism as FileStreamingSink. And MergeOperators should consider only the files produced by the Sink from the same slot. In this case on bucket closing if we want to keep the optimal output size we should make another consolidation strategy. So in order to keep efficiency, we want to perform merge operations in parallel. 6. How to discover files which should be merged? Such files are known by Bucket class. A possible solution is to forward all newly created filenames to the MergeOperator. Another solution is simply to list open buckets periodically. In case we have high parallelism we risk creating unnecessary load on the underlying file system. So for this operation, we would prefer to have a parallelism = 1. 7. Should we split files if they are too big? Probably the problem of the big files should be addressed by the proper Checkpoint Policy. was (Author: oleksandr nitavskyi): MergeOperartor (ConsolidationOperator) will not be able to replace files atomically(at least on HDFS), so some isolation can be violated. For this the possible solution would be to produce data to some temp directory (aka [temp bucket |https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]), then the Consolidation operator will merge small files depending on the format and move files to the final destination in the background. Unfortunately for S3 FS, it would require the copying of the data. Also even in case if the merge will not change any files and simply moves files to the final destination we can a useful feature to the FileStreeamingSink outputs. Since currently, any consumers of the files produced by Flink should filter files without suffixes(which neither .in-progress no .pending).
[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-10203: Description: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have an ability to restore from a certain point of the file after failure and continue to write data. To achieve this recovery functionality the HadoopRecoverableFsDataOutputStream uses "truncate" method which was introduced only in Hadoop 2.7. FLINK-14170 has enabled the usage of StreamingFileSink for OnCheckpointRollingPolicy, but it is still not possible to use StreamingFileSink with DefaultRollingPolicy, which makes writing of the data to HDFS unpractical in scale for HDFS < 2.7. Unfortunately, there are a few official Hadoop distributives which latest version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As the result Flinks Hadoop connector can't work with this distributives. Flink declares that supported Hadoop from version 2.4.0 upwards ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. was: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achieve this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortunately there are a few official Hadoop distributive which latest version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As the result Flinks Hadoop connector can't work with this distributives. Flink declares that supported Hadoop from version 2.4.0 upwards ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / FileSystem >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > Labels: pull-request-available > Attachments: legacy truncate logic.pdf > > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from a certain point of the file after failure and > continue to write data. To achieve this recovery functionality the > HadoopRecoverableFsDataOutputStream uses "truncate" method which was > introduced only in Hadoop 2.7. > FLINK-14170 has enabled the usage of StreamingFileSink for > OnCheckpointRollingPolicy, but it is still not possible to use > StreamingFileSink with DefaultRollingPolicy, which makes writing of the data > to HDFS unpractical in scale for HDFS < 2.7. > Unfortunately, there are a few official Hadoop distributives which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-11396) MetricStore creates significant GC pressure
[ https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi resolved FLINK-11396. - Fix Version/s: 1.9.0 Resolution: Fixed > MetricStore creates significant GC pressure > --- > > Key: FLINK-11396 > URL: https://issues.apache.org/jira/browse/FLINK-11396 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Reporter: Oleksandr Nitavskyi >Priority: Major > Fix For: 1.9.0 > > Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt > > > On Flink version 1.6.x we observe a significant increase of the latency in > UI. > After performance profiling, we have concluded that during UI rendering > back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is > not friendly with G1 GC ergonomics configurations. > On Flink side, MetricStore creates a huge amount of short living objects for > our job, which provokes a lot of Young GC pauses for a non-small job, with > around 50 operators and 120 parallelisms. > Samples of GC logs are in attach. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11396) MetricStore creates significant GC pressure
[ https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041971#comment-17041971 ] Oleksandr Nitavskyi commented on FLINK-11396: - Sorry, we still didn't migrate anything significant on the new UI. Let's close this ticket since UI has been significantly changed? In case we see some problems again we can always reopen with more details :) > MetricStore creates significant GC pressure > --- > > Key: FLINK-11396 > URL: https://issues.apache.org/jira/browse/FLINK-11396 > Project: Flink > Issue Type: Bug > Components: Runtime / Metrics >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt > > > On Flink version 1.6.x we observe a significant increase of the latency in > UI. > After performance profiling, we have concluded that during UI rendering > back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is > not friendly with G1 GC ergonomics configurations. > On Flink side, MetricStore creates a huge amount of short living objects for > our job, which provokes a lot of Young GC pauses for a non-small job, with > around 50 operators and 120 parallelisms. > Samples of GC logs are in attach. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15904) Make Kafka Consumer work with activated "disableGenericTypes()"
[ https://issues.apache.org/jira/browse/FLINK-15904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040142#comment-17040142 ] Oleksandr Nitavskyi commented on FLINK-15904: - made a PR, but cannot assign the Jira ticket on me :( > Make Kafka Consumer work with activated "disableGenericTypes()" > --- > > Key: FLINK-15904 > URL: https://issues.apache.org/jira/browse/FLINK-15904 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Reporter: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > A user reported a problem that the Kafka Consumer doesn't work in that case: > https://lists.apache.org/thread.html/r462a854e8a0ab3512e2906b40411624f3164ea3af7cba61ee94cd760%40%3Cuser.flink.apache.org%3E. > We should use a different constructor for {{ListStateDescriptor}} that takes > {{TypeSerializer}} here: > https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860. > This will circumvent the check. > My full analysis from the email thread: > {quote} > Unfortunately, the fact that the Kafka Sources use Kryo for state > serialization is a very early design misstep that we cannot get rid of for > now. We will get rid of that when the new source interface lands ([1]) and > when we have a new Kafka Source based on that. > As a workaround, we should change the Kafka Consumer to go through a > different constructor of ListStateDescriptor which directly takes a > TypeSerializer instead of a TypeInformation here: [2]. This should sidestep > the "no generic types" check. > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > [2] > https://github.com/apache/flink/blob/68cc21e4af71505efa142110e35a1f8b1c25fe6e/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L860 > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-11911) KafkaTopicPartition is not a valid POJO
[ https://issues.apache.org/jira/browse/FLINK-11911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026852#comment-17026852 ] Oleksandr Nitavskyi edited comment on FLINK-11911 at 1/30/20 5:19 PM: -- What if we try to use custom TypeInformationFactory (https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory), where we can fallback to Kryo if we deserialize old Kryo state? was (Author: oleksandr nitavskyi): Sure, makes sense. What if we try to use custom TypeInformationFactory (https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory), where we can fallback to Kryo if we deserialize old Kryo state? > KafkaTopicPartition is not a valid POJO > --- > > Key: FLINK-11911 > URL: https://issues.apache.org/jira/browse/FLINK-11911 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > KafkaTopicPartition is not a POJO, and therefore it cannot be serialized > efficiently. This is using the KafkaDeserializationSchema. > When enforcing POJO's: > ``` > java.lang.UnsupportedOperationException: Generic types have been disabled in > the ExecutionConfig and type > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is > treated as a generic type. > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) > at > org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102) > at > org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:288) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:856) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > ``` > And in the logs: > ``` > 2019-03-13 16:41:28,217 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition > does not contain a setter for field topic > 2019-03-13 16:41:28,221 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - Class class > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition > cannot be used as a POJO type because not all fields are valid POJO fields, > and must be processed as GenericType. Please read the Flink documentation on > "Data Types & Serialization" for details of the effect on performance. > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-11911) KafkaTopicPartition is not a valid POJO
[ https://issues.apache.org/jira/browse/FLINK-11911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026852#comment-17026852 ] Oleksandr Nitavskyi commented on FLINK-11911: - Sure, makes sense. What if we try to use custom TypeInformationFactory (https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory), where we can fallback to Kryo if we deserialize old Kryo state? > KafkaTopicPartition is not a valid POJO > --- > > Key: FLINK-11911 > URL: https://issues.apache.org/jira/browse/FLINK-11911 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.8.0 >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > KafkaTopicPartition is not a POJO, and therefore it cannot be serialized > efficiently. This is using the KafkaDeserializationSchema. > When enforcing POJO's: > ``` > java.lang.UnsupportedOperationException: Generic types have been disabled in > the ExecutionConfig and type > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition is > treated as a generic type. > at > org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) > at > org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102) > at > org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:288) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:289) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:219) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:856) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > ``` > And in the logs: > ``` > 2019-03-13 16:41:28,217 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition > does not contain a setter for field topic > 2019-03-13 16:41:28,221 INFO > org.apache.flink.api.java.typeutils.TypeExtractor - Class class > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition > cannot be used as a POJO type because not all fields are valid POJO fields, > and must be processed as GenericType. Please read the Flink documentation on > "Data Types & Serialization" for details of the effect on performance. > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-12489) Flink on Mesos - Parameterize network resources.
[ https://issues.apache.org/jira/browse/FLINK-12489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reassigned FLINK-12489: --- Assignee: Oleksandr Nitavskyi > Flink on Mesos - Parameterize network resources. > > > Key: FLINK-12489 > URL: https://issues.apache.org/jira/browse/FLINK-12489 > Project: Flink > Issue Type: Improvement > Components: Deployment / Mesos >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Minor > > Mesos supports network resource parameters. It would be nice if Flink can > specify network resources consumption. > Unfortunately, network resource is not standarized in Mesos, so fenzo name: > "network" should be customized. > Thus we can introduce two parameters: > 1. Network Bandwidth in mb. > 2. Name of the network resource in mesos, where "network" will be the value > by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12489) Flink on Mesos - Parameterize network resources.
Oleksandr Nitavskyi created FLINK-12489: --- Summary: Flink on Mesos - Parameterize network resources. Key: FLINK-12489 URL: https://issues.apache.org/jira/browse/FLINK-12489 Project: Flink Issue Type: Improvement Components: Deployment / Mesos Reporter: Oleksandr Nitavskyi Mesos supports network resource parameters. It would be nice if Flink can specify network resources consumption. Unfortunately, network resource is not standarized in Mesos, so fenzo name: "network" should be customized. Thus we can introduce two parameters: 1. Network Bandwidth in mb. 2. Name of the network resource in mesos, where "network" will be the value by default. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12139) Flink on mesos - Parameterize disk space needed.
[ https://issues.apache.org/jira/browse/FLINK-12139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reassigned FLINK-12139: --- Assignee: Oleksandr Nitavskyi > Flink on mesos - Parameterize disk space needed. > > > Key: FLINK-12139 > URL: https://issues.apache.org/jira/browse/FLINK-12139 > Project: Flink > Issue Type: Improvement > Components: Deployment / Mesos >Reporter: Juan >Assignee: Oleksandr Nitavskyi >Priority: Minor > > We are having a small issue while trying to deploy Flink on Mesos using > marathon. In our set up of Mesos we are required to specify the amount of > disk space we want to have for the applications we deploy there. > The current default value in Flink is 0 and it's currently is not > parameterizable. This means that we ask 0 disk space for our instances so > Flink can't work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi closed FLINK-10342. --- Resolution: Fixed > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Minor > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reopened FLINK-10342: - > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Minor > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi closed FLINK-10342. --- Resolution: Won't Fix > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Minor > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11396) MetricStore creates significant GC pressure
[ https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16746997#comment-16746997 ] Oleksandr Nitavskyi commented on FLINK-11396: - On GC tuning side, G1 ergonomic concluded we need to have a small Young Memory Pool (because short living object are created only during UI usage, which is rare), so alternative solution would be to set Yound Memory Pool size via parameters or use ParallelGC. > MetricStore creates significant GC pressure > --- > > Key: FLINK-11396 > URL: https://issues.apache.org/jira/browse/FLINK-11396 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt > > > On Flink version 1.6.x we observe a significant increase of the latency in > UI. > After performance profiling, we have concluded that during UI rendering > back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is > not friendly with G1 GC ergonomics configurations. > On Flink side, MetricStore creates a huge amount of short living objects for > our job, which provokes a lot of Young GC pauses for a non-small job, with > around 50 operators and 120 parallelisms. > Samples of GC logs are in attach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11396) MetricStore creates significant GC pressure
[ https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11396: Attachment: GC_parallel_flink_job_manager.log g1.txt gc_example.txt > MetricStore creates significant GC pressure > --- > > Key: FLINK-11396 > URL: https://issues.apache.org/jira/browse/FLINK-11396 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: GC_parallel_flink_job_manager.log, g1.txt, gc_example.txt > > > On Flink version 1.6.x we observe a significant increase of the latency in > UI. > After performance profiling, we have concluded that during UI rendering > back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is > not friendly with G1 GC ergonomics configurations. > On Flink side, MetricStore creates a huge amount of short living objects for > our job, which provokes a lot of Young GC pauses for a non-small job, with > around 50 operators and 120 parallelisms. > Samples of GC logs are in attach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11396) MetricStore creates significant GC pressure
Oleksandr Nitavskyi created FLINK-11396: --- Summary: MetricStore creates significant GC pressure Key: FLINK-11396 URL: https://issues.apache.org/jira/browse/FLINK-11396 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi On Flink version 1.6.x we observe a significant increase of the latency in UI. After performance profiling, we have concluded that during UI rendering back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is not friendly with G1 GC ergonomics configurations. On Flink side, MetricStore creates a huge amount of short living objects for our job, which provokes a lot of Young GC pauses for a non-small job, with around 50 operators and 120 parallelisms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11396) MetricStore creates significant GC pressure
[ https://issues.apache.org/jira/browse/FLINK-11396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11396: Description: On Flink version 1.6.x we observe a significant increase of the latency in UI. After performance profiling, we have concluded that during UI rendering back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is not friendly with G1 GC ergonomics configurations. On Flink side, MetricStore creates a huge amount of short living objects for our job, which provokes a lot of Young GC pauses for a non-small job, with around 50 operators and 120 parallelisms. Samples of GC logs are in attach. was: On Flink version 1.6.x we observe a significant increase of the latency in UI. After performance profiling, we have concluded that during UI rendering back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is not friendly with G1 GC ergonomics configurations. On Flink side, MetricStore creates a huge amount of short living objects for our job, which provokes a lot of Young GC pauses for a non-small job, with around 50 operators and 120 parallelisms. > MetricStore creates significant GC pressure > --- > > Key: FLINK-11396 > URL: https://issues.apache.org/jira/browse/FLINK-11396 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > > On Flink version 1.6.x we observe a significant increase of the latency in > UI. > After performance profiling, we have concluded that during UI rendering > back-end spends 50% of the time on GC pauses, which means that Flink 1.6.x is > not friendly with G1 GC ergonomics configurations. > On Flink side, MetricStore creates a huge amount of short living objects for > our job, which provokes a lot of Young GC pauses for a non-small job, with > around 50 operators and 120 parallelisms. > Samples of GC logs are in attach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=600,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=250,width=450! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=600,width=800! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=250,width=250! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=250,width=250! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=400,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=400,width=800! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=400,width=800! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=400,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=400,width=800! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=400,width=800! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=400,width=800! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=400,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=500,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=400,width=800! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=250,width=250! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=250,width=250! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=500,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=600,width=800! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=500,width=800! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=250,width=250! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=250,width=250! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=250,width=450! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=250,width=250! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=250,width=450! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=250,width=250! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=250,width=250! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11394) Job UI is not rendered in case of slow back-end
[ https://issues.apache.org/jira/browse/FLINK-11394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-11394: Description: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png|height=250,width=250! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png|height=250,width=250! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png|height=250,width=250! was: In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png! > Job UI is not rendered in case of slow back-end > --- > > Key: FLINK-11394 > URL: https://issues.apache.org/jira/browse/FLINK-11394 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Priority: Major > Attachments: image-2019-01-18-19-53-17-358.png, > image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png > > > In case back-end requests for the job state take longer than > _web.refresh-interval_ Job UI is not rendered correctly. > Request to back-end is made: > !image-2019-01-18-19-53-17-358.png|height=250,width=250! > In case one of refresh calls take more than _web.refresh-interval_ next > requests are made. > !image-2019-01-18-19-53-42-565.png|height=250,width=250! > The correct answer has been received on the UI side, but nothing has been > rendered. > !image-2019-01-18-19-54-36-964.png|height=250,width=250! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11394) Job UI is not rendered in case of slow back-end
Oleksandr Nitavskyi created FLINK-11394: --- Summary: Job UI is not rendered in case of slow back-end Key: FLINK-11394 URL: https://issues.apache.org/jira/browse/FLINK-11394 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi Attachments: image-2019-01-18-19-53-17-358.png, image-2019-01-18-19-53-42-565.png, image-2019-01-18-19-54-36-964.png In case back-end requests for the job state take longer than _web.refresh-interval_ Job UI is not rendered correctly. Request to back-end is made: !image-2019-01-18-19-53-17-358.png! In case one of refresh calls take more than _web.refresh-interval_ next requests are made. !image-2019-01-18-19-53-42-565.png! The correct answer has been received on the UI side, but nothing has been rendered. !image-2019-01-18-19-54-36-964.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi updated FLINK-10342: Priority: Minor (was: Major) > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Minor > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617172#comment-16617172 ] Oleksandr Nitavskyi edited comment on FLINK-10342 at 9/17/18 11:29 AM: --- Thanks for the comment. I agree that it works as expected from the Flink developer point of view, loading old TopicPartitions consumptions from the state and discovering newly requested TopicPartitions. In case there are not intersections we consume from both sets of TopicPartitions, since state in KafkaSource operator can never expire. In the same time, this behavior is counterintuitive for the Flink users. When KafkaSource is created consuming "topic 1" it expected that "topic 1" will be consumed. {code} new KafkaSource("topic 1") {code} If after the refactoring KafkaSource is starting to consume another "topic 2": {code} new KafkaSource("topic 2") {code} And for us it sounds intuitive that data will come from the "topic 2" and only from the "topic 2" and current behavior has the hole in the abstraction. I believe that it worth to make some action points at least on of: * Make a small check in the state restoring method where we skip topics which are not passed via class constructor. * Log some warning if topics in the state and in the constructors are different * Document such behavior, also can be a good exercise which clarifies how does state managed and help to start thinking in a little bit different paradigm If you think it worth to make some action point let me know and I will contribute. Thank you was (Author: oleksandr nitavskyi): Thanks for the comment. I agree that it works as expected from the Flink developer point of view, loading old TopicPartitions consumptions from the state and discovering newly requested TopicPartitions. In case there are not intersections we consume from both sets of TopicPartitions, since state in KafkaSource operator can never expire. In the same time, this behavior is counterintuitive for the Flink users. When KafkaSource is created consuming "topic 1" it expected that "topic 1" will be consumed. {code} new KafkaSource("topic 1") {code} If after the refactoring KafkaSource is starting to consume another "topic 2": {code} new Kafka Source("topic 2") {code} And for us it sounds intuitive that data will come from the "topic 2" and only from the "topic 2" and current behavior has the hole in the abstraction. I believe that it worth to make some action points at least on of: * Make a small check in the state restoring method where we skip topics which are not passed via class constructor. * Log some warning if topics in the state and in the constructors are different * Document such behavior, also can be a good exercise which clarifies how does state managed and help to start thinking in a little bit different paradigm If you think it worth to make some action point let me know and I will contribute. Thank you > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reopened FLINK-10342: - > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617172#comment-16617172 ] Oleksandr Nitavskyi commented on FLINK-10342: - Thanks for the comment. I agree that it works as expected from the Flink developer point of view, loading old TopicPartitions consumptions from the state and discovering newly requested TopicPartitions. In case there are not intersections we consume from both sets of TopicPartitions, since state in KafkaSource operator can never expire. In the same time, this behavior is counterintuitive for the Flink users. When KafkaSource is created consuming "topic 1" it expected that "topic 1" will be consumed. {code} new KafkaSource("topic 1") {code} If after the refactoring KafkaSource is starting to consume another "topic 2": {code} new Kafka Source("topic 2") {code} And for us it sounds intuitive that data will come from the "topic 2" and only from the "topic 2" and current behavior has the hole in the abstraction. I believe that it worth to make some action points at least on of: * Make a small check in the state restoring method where we skip topics which are not passed via class constructor. * Log some warning if topics in the state and in the constructors are different * Document such behavior, also can be a good exercise which clarifies how does state managed and help to start thinking in a little bit different paradigm If you think it worth to make some action point let me know and I will contribute. Thank you > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
[ https://issues.apache.org/jira/browse/FLINK-10342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleksandr Nitavskyi reassigned FLINK-10342: --- Assignee: Oleksandr Nitavskyi > Kafka duplicate topic consumption when topic name is changed > > > Key: FLINK-10342 > URL: https://issues.apache.org/jira/browse/FLINK-10342 > Project: Flink > Issue Type: Bug >Reporter: Oleksandr Nitavskyi >Assignee: Oleksandr Nitavskyi >Priority: Major > > In case of topic name is simply renamed for a KafkaConsumer Flink starts to > consume from old and a new topic in the same time which can lead to > unexpected behavior. > Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10342) Kafka duplicate topic consumption when topic name is changed
Oleksandr Nitavskyi created FLINK-10342: --- Summary: Kafka duplicate topic consumption when topic name is changed Key: FLINK-10342 URL: https://issues.apache.org/jira/browse/FLINK-10342 Project: Flink Issue Type: Bug Reporter: Oleksandr Nitavskyi In case of topic name is simply renamed for a KafkaConsumer Flink starts to consume from old and a new topic in the same time which can lead to unexpected behavior. Here is the PR with reproduce: https://github.com/apache/flink/pull/6691 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn
Oleksandr Nitavskyi created FLINK-9762: -- Summary: CoreOptions.TMP_DIRS wrongly managed on Yarn Key: FLINK-9762 URL: https://issues.apache.org/jira/browse/FLINK-9762 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.5.0 Reporter: Oleksandr Nitavskyi The issue on Yarn is that it is impossible to have different LOCAL_DIRS on JobManager and TaskManager, despite LOCAL_DIRS value depends on the container. The issue is that CoreOptions.TMP_DIRS is configured to the default value during JobManager initialization and added to the configuration object. When TaskManager is launched the appropriate configuration object is cloned with LOCAL_DIRS which makes sense only for Job Manager container. When YARN container with TaskManager from his point of view CoreOptions.TMP_DIRS is always equal either to path in flink.yml or to the or to the LOCAL_DIRS of Job Manager (default behaviour). Is TaskManager’s container do not have an access to another folders, that folders allocated by YARN TaskManager cannot be started. -- This message was sent by Atlassian JIRA (v7.6.3#76005)