Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub commented on code in PR #23944: URL: https://github.com/apache/flink/pull/23944#discussion_r1429620891 ## flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java: ## @@ -40,8 +43,16 @@ public static JobGraph getJobGraph( FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(userClassloader, pipeline); -return pipelineTranslator.translateToJobGraph( -pipeline, optimizerConfiguration, defaultParallelism); +JobGraph jobGraph = +pipelineTranslator.translateToJobGraph( +pipeline, optimizerConfiguration, defaultParallelism); + +Map parallelismOverrides = + optimizerConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES); +jobGraph.getJobConfiguration() +.set(PipelineOptions.PARALLELISM_OVERRIDES, parallelismOverrides); + Review Comment: Thanks for the suggestion @JunRuiLee. I agree it better distinguishes between an unset configuration and a configured but empty map. I have updated the PR according to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33704][BP 1.18][Filesytems] Update GCS filesystems to latest available versions [flink]
czchen commented on PR #23935: URL: https://github.com/apache/flink/pull/23935#issuecomment-1859660606 Shall be https://github.com/apache/flink/pull/23837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33870) Split the HighAvailabilityServices into LeaderServices and PersistentServices
Yangze Guo created FLINK-33870: -- Summary: Split the HighAvailabilityServices into LeaderServices and PersistentServices Key: FLINK-33870 URL: https://issues.apache.org/jira/browse/FLINK-33870 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Yangze Guo Assignee: Yangze Guo -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32667) Enable users to independently adjust the high availability strategies related to jobs through configuration
[ https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32667: --- Description: In OLAP scenarios, we only require the leader election services for the Dispatcher / ResourceManager and RestEndpoint in the JobManager process. Leader election services and persistent services are redundant for jobs and may impact cluster performance. To generate HA services suitable for OLAP scenarios, we introduce the high-availability.enable-job-recovery parameter. When users enable HA with Kubernetes or ZooKeeper and set this option to false, we will select the combination of DefaultLeaderServices and EmbeddedPersistentServices. Additionally, we will set the JobMaster's LeaderElectionService and LeaderRetrieverService to the Standalone version. was:When a flink session cluster use zk or k8s high availability service, it will store jobs in zk or ConfigMap. When we submit flink olap jobs to the session cluster, they always turn off restart strategy. These jobs with no-restart-strategy should not be stored in zk or ConfigMap in k8s > Enable users to independently adjust the high availability strategies related > to jobs through configuration > --- > > Key: FLINK-32667 > URL: https://issues.apache.org/jira/browse/FLINK-32667 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available, stale-assigned > > In OLAP scenarios, we only require the leader election services for the > Dispatcher / ResourceManager and RestEndpoint in the JobManager process. > Leader election services and persistent services are redundant for jobs and > may impact cluster performance. > To generate HA services suitable for OLAP scenarios, we introduce the > high-availability.enable-job-recovery parameter. When users enable HA with > Kubernetes or ZooKeeper and set this option to false, we will select the > combination of DefaultLeaderServices and EmbeddedPersistentServices. > Additionally, we will set the JobMaster's LeaderElectionService and > LeaderRetrieverService to the Standalone version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798026#comment-17798026 ] Dong Lin commented on FLINK-25509: -- [~ruanhang1993] I think we need to update flink-kafka-connector's master branch to depend on Flink 1.18.0 (it currectnly depends on Flink 1.17.0) before putting in your feature. And then a new branch/version of flink-kafka-connector will be created in its upcoming release. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32667) Enable users to independently adjust the high availability strategies related to jobs through configuration
[ https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32667: --- Summary: Enable users to independently adjust the high availability strategies related to jobs through configuration (was: Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster) > Enable users to independently adjust the high availability strategies related > to jobs through configuration > --- > > Key: FLINK-32667 > URL: https://issues.apache.org/jira/browse/FLINK-32667 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available, stale-assigned > > When a flink session cluster use zk or k8s high availability service, it will > store jobs in zk or ConfigMap. When we submit flink olap jobs to the session > cluster, they always turn off restart strategy. These jobs with > no-restart-strategy should not be stored in zk or ConfigMap in k8s -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
JunRuiLee commented on code in PR #23944: URL: https://github.com/apache/flink/pull/23944#discussion_r1429569157 ## flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java: ## @@ -40,8 +43,16 @@ public static JobGraph getJobGraph( FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(userClassloader, pipeline); -return pipelineTranslator.translateToJobGraph( -pipeline, optimizerConfiguration, defaultParallelism); +JobGraph jobGraph = +pipelineTranslator.translateToJobGraph( +pipeline, optimizerConfiguration, defaultParallelism); + +Map parallelismOverrides = + optimizerConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES); +jobGraph.getJobConfiguration() +.set(PipelineOptions.PARALLELISM_OVERRIDES, parallelismOverrides); + Review Comment: I suggest use `getOptional` instead of `get` for fetching `PARALLELISM_OVERRIDES` to ensure that the 'jobConfiguration' only contains which is explicitly set by users like below. `optimizerConfiguration.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).ifPresent(map -> jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES, map));` Although the current jobConfiguration does not yet include all job-level configurations, in the long term, this jobConfiguration field should contain and only contain the job-level configuration items explicitly configured by the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798025#comment-17798025 ] Tongtong Zhu edited comment on FLINK-33588 at 12/18/23 6:52 AM: [~jingge] I have made the necessary modifications as per your request. Can this PR be released in version 1.18.1? https://github.com/apache/flink/pull/23931 was (Author: JIRAUSER301126): [~jingge] I have made the necessary modifications as per your request. Can this PR be released in version 1.18.1? > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at >
[jira] [Commented] (FLINK-33588) Fix Flink Checkpointing Statistics Bug
[ https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798025#comment-17798025 ] Tongtong Zhu commented on FLINK-33588: -- [~jingge] I have made the necessary modifications as per your request. Can this PR be released in version 1.18.1? > Fix Flink Checkpointing Statistics Bug > -- > > Key: FLINK-33588 > URL: https://issues.apache.org/jira/browse/FLINK-33588 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1 >Reporter: Tongtong Zhu >Assignee: Tongtong Zhu >Priority: Critical > Labels: pull-request-available > Fix For: 1.19.0, 1.18.1 > > Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, > image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, > newCommit-FLINK-33688.patch > > > When the Flink task is first started, the checkpoint data is null due to the > lack of data, and Percentile throws a null pointer exception when calculating > the percentage. After multiple tests, I found that it is necessary to set an > initial value for the statistical data value of the checkpoint when the > checkpoint data is null (i.e. at the beginning of the task) to solve this > problem. > The following is an abnormal description of the bug: > 2023-09-13 15:02:54,608 ERROR > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler > [] - Unhandled exception. > org.apache.commons.math3.exception.NullArgumentException: input array > at > org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87) > ~[flink-dist_2.12-1.14.5.jar:1.14.5] > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > [?:1.8.0_151] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > [?:1.8.0_151] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > [?:1.8.0_151] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at >
[jira] [Updated] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster
[ https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32667: --- Parent: FLINK-33852 Issue Type: Sub-task (was: Improvement) > Use standalone store and embedding writer for jobs with no-restart-strategy > in session cluster > -- > > Key: FLINK-32667 > URL: https://issues.apache.org/jira/browse/FLINK-32667 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available, stale-assigned > > When a flink session cluster use zk or k8s high availability service, it will > store jobs in zk or ConfigMap. When we submit flink olap jobs to the session > cluster, they always turn off restart strategy. These jobs with > no-restart-strategy should not be stored in zk or ConfigMap in k8s -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster
[ https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32667: --- Parent: (was: FLINK-25318) Issue Type: Improvement (was: Sub-task) > Use standalone store and embedding writer for jobs with no-restart-strategy > in session cluster > -- > > Key: FLINK-32667 > URL: https://issues.apache.org/jira/browse/FLINK-32667 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: Fang Yong >Assignee: Yangze Guo >Priority: Major > Labels: pull-request-available, stale-assigned > > When a flink session cluster use zk or k8s high availability service, it will > store jobs in zk or ConfigMap. When we submit flink olap jobs to the session > cluster, they always turn off restart strategy. These jobs with > no-restart-strategy should not be stored in zk or ConfigMap in k8s -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
X-czh commented on code in PR #23447: URL: https://github.com/apache/flink/pull/23447#discussion_r1429559942 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -126,9 +140,14 @@ public synchronized void updateCurrentExecutionAttempts(Collection j taskMetricStore.getSubtaskMetricStore( subtaskIndex)) .ifPresent( -subtaskMetricStore -> - subtaskMetricStore.retainAttempts( - attempts.getCurrentAttempts())); +subtaskMetricStore -> { + subtaskMetricStore.retainAttempts( Review Comment: Good catch! But I'm wondering if this is a good approach to do so here. It complicates the code and makes maintenance more difficult. Since the duplication was introduced to overcome the issue that WebInterface task metric queries currently do not account for subtasks, how about leaving it unremoved here for now and create a new JIRA on updating the WebInterface to account for subtasks? I'd be willing to help with that as well. cc @JunRuiLee @wanglijie95 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub commented on PR #23944: URL: https://github.com/apache/flink/pull/23944#issuecomment-1859639546 Thanks for the comments @reswqa. I have updated the PR according to the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub commented on PR #23944: URL: https://github.com/apache/flink/pull/23944#issuecomment-1859639249 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub commented on code in PR #23944: URL: https://github.com/apache/flink/pull/23944#discussion_r1429556103 ## flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java: ## @@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor { private final boolean useLargeRecordHandler; +private final Map parallelismOverrides; Review Comment: Thanks for pointing this out. It seems that neither `JobGraphGenerator` nor `StreamingJobGraphGenerator` has applied this logic. I'll move this to FlinkPipelineTranslationUtil.getJobGraph, which helps to apply this logic regardless of Plan or StreamGraph. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1429554092 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + +String relocatedDbLogPrefix = +RocksDBResourceContainer.resolveRelocatedDbLogPrefix( +instanceRocksDBPath.getAbsolutePath()); +java.nio.file.Path[] relocatedDbLogs; +try { +relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix)); +// add a rolled log file +Files.createTempFile(relocatedDBLogDir.toPath(), relocatedDbLogPrefix, ".suffix"); Review Comment: Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
reswqa commented on code in PR #23944: URL: https://github.com/apache/flink/pull/23944#discussion_r1429533482 ## flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java: ## @@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor { private final boolean useLargeRecordHandler; +private final Map parallelismOverrides; Review Comment: IIRC, This(`JobGraphGenerator`) should only be used in `DataSet` API. I wonder do we really need to support this configuration for it, since this part of APIs was deprecated since 1.18. ## flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java: ## @@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor { private final boolean useLargeRecordHandler; +private final Map parallelismOverrides; Review Comment: Another question is: Have we already done the same support for DataStream API(from `StreamJobGraphGenerator`)? ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -1151,6 +1151,43 @@ public void testOverridingJobVertexParallelisms() throws Exception { Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42); } +@Test +public void testOverridingJobVertexParallelismsWithJobSubmission() throws Exception { +JobVertex v1 = new JobVertex("v1"); +v1.setParallelism(1); +JobVertex v2 = new JobVertex("v2"); +v2.setParallelism(2); +JobVertex v3 = new JobVertex("v3"); +v3.setParallelism(3); +jobGraph = new JobGraph(jobGraph.getJobID(), "job", v1, v2, v3); +jobGraph.getJobConfiguration() Review Comment: It seems that config through the `JobGraph` has a higher priority than the `Dispatcher` itself, and we may not need to introduce new tests. Just let `testOverridingJobVertexParallelisms` test the two configurations at the same time, it also can test the priority of both configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32346) JdbcNumericBetweenParametersProvider Sharding key boundaries large storage long integer overflow, use BigDecimal instead Long
[ https://issues.apache.org/jira/browse/FLINK-32346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798019#comment-17798019 ] zhilinli commented on FLINK-32346: -- Hi Community gods Can I support this feature? If this feature is supported, it will support all integer types, and there will be no limit to the length,[~libenchao] [~martijnvisser] > JdbcNumericBetweenParametersProvider Sharding key boundaries large storage > long integer overflow, use BigDecimal instead Long > -- > > Key: FLINK-32346 > URL: https://issues.apache.org/jira/browse/FLINK-32346 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: zhilinli >Priority: Major > Attachments: image-2023-06-15-16-42-16-773.png, > image-2023-06-15-16-46-13-188.png > > > *JdbcNumericBetweenParametersProvider.class* > Sharding key boundaries large storage long integer overflow, use BigDecimal > instead Long, so that length types such as DecimalType(30,0) are compatible > and LONG cannot be stored Can be assigned to me and I want to complete it > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
flinkbot commented on PR #23944: URL: https://github.com/apache/flink/pull/23944#issuecomment-1859594720 ## CI report: * baad6c06ef71afca753ef0c5f8fb2af0a3e8c2ce UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request
[ https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33534: --- Labels: pull-request-available (was: ) > PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission > request > -- > > Key: FLINK-33534 > URL: https://issues.apache.org/jira/browse/FLINK-33534 > Project: Flink > Issue Type: Bug > Components: Runtime / Configuration, Runtime / REST >Affects Versions: 1.18.0, 1.17.1 >Reporter: Gyula Fora >Assignee: Yunfeng Zhou >Priority: Major > Labels: pull-request-available > > PARALLELISM_OVERRIDES are currently only applied when they are part of the > JobManager / Cluster configuration. > When this config is provided as part of the JarRunRequestBody it is > completely ignored and does not take effect. > The main reason is that the dispatcher reads this value from it's own > configuration object and does not include the extra configs passed through > the rest request. > This is a blocker for supporting the autoscaler properly for FlinkSessionJobs > in the autoscaler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]
yunfengzhou-hub opened a new pull request, #23944: URL: https://github.com/apache/flink/pull/23944 ## What is the purpose of the change This pull request supports configuring the PARALLELISM_OVERRIDES configuration during the job submission process. Before this PR, Flink has currently only allowed configuring this parameter before the Dispatcher, or the cluster, starts. ## Brief change log - Pass PARALLELISM_OVERRIDES from environment to job graph. - Make dispatcher use PARALLELISM_OVERRIDES from job graph before submitting job. ## Verifying this change This change is covered by newly added tests in JarRunHandlerParameterTest and DispatcherTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
wanglijie95 commented on code in PR #23447: URL: https://github.com/apache/flink/pull/23447#discussion_r1429465647 ## flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java: ## @@ -355,16 +356,21 @@ public String toString() { /** * The CurrentAttempts holds the attempt number of the current representative execution attempt, - * and the attempt numbers of all the running attempts. + * the attempt numbers of all the running attempts, and whether the current execution has + * reached terminal state. */ public static final class CurrentAttempts implements Serializable { private final int representativeAttempt; private final Set currentAttempts; -public CurrentAttempts(int representativeAttempt, Set currentAttempts) { +private final boolean terminal; Review Comment: Maybe `terminal` -> `reachTerminalState` or `isTerminalState` ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -57,6 +60,17 @@ public class MetricStore { private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); +/** + * The set holds the names of the transient metrics which are no longer useful after a subtask + * reaches terminal state and shall be removed to avoid misleading users. + */ Review Comment: Maybe add one more line: "Note that there may be other transient metrics, we currently only support cleaning these three" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
JunRuiLee commented on code in PR #23447: URL: https://github.com/apache/flink/pull/23447#discussion_r1429512642 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java: ## @@ -126,9 +140,14 @@ public synchronized void updateCurrentExecutionAttempts(Collection j taskMetricStore.getSubtaskMetricStore( subtaskIndex)) .ifPresent( -subtaskMetricStore -> - subtaskMetricStore.retainAttempts( - attempts.getCurrentAttempts())); +subtaskMetricStore -> { + subtaskMetricStore.retainAttempts( Review Comment: Since the metrics for subtasks also exist in the taskMetricsStore in the form of taskInfo.subtaskIndex + "." + name, it is also necessary to clean up the transient metrics stored in the taskMetricsStore. Otherwise, it could lead to inconsistent behaviors that may confuse users, as depicted in the screenshot below. ![image](https://github.com/apache/flink/assets/107924572/ac8de662-7129-4445-a793-f0facca352e8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-33565) The concurrentExceptions doesn't work
[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798003#comment-17798003 ] Rui Fan edited comment on FLINK-33565 at 12/18/23 4:21 AM: --- Hi [~mapohl], I have finished the POC, and wanna to check whether my solution is fine for you. POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this PR will be updated in the future, and I have backuped a permanent branch here: [https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/] Core idea is: For each failure, RestartStrategy will return whether current failure is a new attempt(Based on FLIP-364). * If it's a new attempt, it's root exception, and it will be the latest root exception. * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. Core changes: My poc branch has 3 commits related to this JIRA: # FLINK-33565[Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting # FLINK-33565[Exception] Restart strategy checks whether current failure is a new attempt # FLINK-33565[Scheduler] ConcurrentExceptions works with exception merging The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt. The third commit is core solution of this JIRA: * If it's a new attempt, it's root exception. and it will be the latest root exception. * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. The last commit, I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI. If you agree with my solution, I can go ahead. If not, we can discuss first. Looking forward to your opinions, thanks~ !screenshot-1.png|width=923,height=468! was (Author: fanrui): Hi [~mapohl], I have finished the POC, and wanna to check whether my solution is fine for you. POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this PR will be updated in the future, and I have backup a permanent branch here: https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/ Core idea is: For each failure, RestartStrategy will return whether current failure is a new attempt(Based on FLIP-364). * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. Core changes: My poc branch has 3 commits related to this JIRA: # [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting # [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt # [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt. The third commit is core solution of this JIRA: * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. The last commit, I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI. If you agree with my solution, I can go ahead. If not, we can discuss first. Looking forward to your opinions, thanks~ !screenshot-1.png! > The concurrentExceptions doesn't work > - > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.19.0 > >
[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work
[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798003#comment-17798003 ] Rui Fan commented on FLINK-33565: - Hi [~mapohl], I have finished the POC, and wanna to check whether my solution is fine for you. POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this PR will be updated in the future, and I have backup a permanent branch here: https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/ Core idea is: For each failure, RestartStrategy will return whether current failure is a new attempt(Based on FLIP-364). * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. Core changes: My poc branch has 3 commits related to this JIRA: # [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting # [FLINK-33565][Exception] Restart strategy checks whether current failure is a new attempt # [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging The first commit is refactoring, actually, I don't know why archiving exception when restarting task instead of immediately. It means, when one task failure, we can see the exception history after flink restart this task. So the first commit is only a refactoring. It archives exceptions into the exception history immediately when they occur, instead of archiving them when restarting. The second commit is related to restart strategy, adding a return value indicates whether current failure is a new attempt. The third commit is core solution of this JIRA: * If it's a new attempt, it's root exception * If it's not a new attempt, it will be a concurrentException and it will be added to the latest RootException. The last commit, I added a job demo with 6 regions, all tasks will fail when processing the first record, this demo job can be run directly. Here is the result, we can see all failed tasks in the WebUI. If you agree with my solution, I can go ahead. If not, we can discuss first. Looking forward to your opinions, thanks~ !screenshot-1.png! > The concurrentExceptions doesn't work > - > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > First of all, thanks to [~mapohl] for helping double-check in advance that > this was indeed a bug . > Displaying exception history in WebUI is supported in FLINK-6042. > h1. What's the concurrentExceptions? > When an execution fails due to an exception, other executions in the same > region will also restart, and the first Exception is rootException. If other > restarted executions also report Exception at this time, we hope to collect > these exceptions and Displayed to the user as concurrentExceptions. > h2. What's this bug? > The concurrentExceptions is always empty in production, even if other > executions report exception at very close times. > h1. Why doesn't it work? > If one job has all-to-all shuffle, this job only has one region, and this > region has a lot of executions. If one execution throw exception: > * JobMaster will mark the state as FAILED for this execution. > * The rest of executions of this region will be marked to CANCELING. > ** This call stack can be found at FLIP-364 > [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] > > When these executions throw exception as well, it JobMaster will mark the > state from CANCELING to CANCELED instead of FAILED. > The CANCELED execution won't call FAILED logic, so their exceptions are > ignored. > Note: all reports are executed inside of JobMaster RPC thread, it's single > thread. So these reports are executed serially. So only one execution is > marked to FAILED, and the rest of executions will be marked to CANCELED later. > h1. How to fix it? > Offline discuss with [~mapohl] , we need to discuss with community should we > keep the concurrentExceptions first. > * If no, we can remove related logic directly > * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33565) The concurrentExceptions doesn't work
[ https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33565: Attachment: screenshot-1.png > The concurrentExceptions doesn't work > - > > Key: FLINK-33565 > URL: https://issues.apache.org/jira/browse/FLINK-33565 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0, 1.17.1 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Fix For: 1.19.0 > > Attachments: screenshot-1.png > > > First of all, thanks to [~mapohl] for helping double-check in advance that > this was indeed a bug . > Displaying exception history in WebUI is supported in FLINK-6042. > h1. What's the concurrentExceptions? > When an execution fails due to an exception, other executions in the same > region will also restart, and the first Exception is rootException. If other > restarted executions also report Exception at this time, we hope to collect > these exceptions and Displayed to the user as concurrentExceptions. > h2. What's this bug? > The concurrentExceptions is always empty in production, even if other > executions report exception at very close times. > h1. Why doesn't it work? > If one job has all-to-all shuffle, this job only has one region, and this > region has a lot of executions. If one execution throw exception: > * JobMaster will mark the state as FAILED for this execution. > * The rest of executions of this region will be marked to CANCELING. > ** This call stack can be found at FLIP-364 > [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover] > > When these executions throw exception as well, it JobMaster will mark the > state from CANCELING to CANCELED instead of FAILED. > The CANCELED execution won't call FAILED logic, so their exceptions are > ignored. > Note: all reports are executed inside of JobMaster RPC thread, it's single > thread. So these reports are executed serially. So only one execution is > marked to FAILED, and the rest of executions will be marked to CANCELED later. > h1. How to fix it? > Offline discuss with [~mapohl] , we need to discuss with community should we > keep the concurrentExceptions first. > * If no, we can remove related logic directly > * If yew, we discuss how to fix it later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records
[ https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798000#comment-17798000 ] Hang Ruan commented on FLINK-25509: --- Hi, [~martijnvisser] & [~lindong] . The Flink 1.18 has been released and I think we could push this feature now. Which kafka connector version should we put it into? This feature relies on the interface in 1.18 and is not compatible in 1.17 or earlier. > FLIP-208: Add RecordEvaluator to dynamically stop source based on > de-serialized records > --- > > Key: FLINK-25509 > URL: https://issues.apache.org/jira/browse/FLINK-25509 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common, Connectors / Kafka >Reporter: Dong Lin >Assignee: Hang Ruan >Priority: Major > Labels: pull-request-available, stale-assigned > > This feature is needed to migrate applications which uses > KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to > using KafkaSource. > Please checkout > https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records > for the motivation and the proposed changes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33795] Add a new config to forbid autoscale execution in the configured excluded periods [flink-kubernetes-operator]
flashJd commented on code in PR #728: URL: https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1429453681 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java: ## @@ -94,4 +99,106 @@ public static boolean excludeVerticesFromScaling( conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds)); return anyAdded; } + +/** Quartz doesn't have the invertTimeRange flag so rewrite this method. */ +static boolean isTimeIncluded(CronCalendar cron, long timeInMillis) { +if (cron.getBaseCalendar() != null +&& !cron.getBaseCalendar().isTimeIncluded(timeInMillis)) { +return false; +} else { +return cron.getCronExpression().isSatisfiedBy(new Date(timeInMillis)); +} +} + +static Optional interpretAsDaily(String subExpression) { +String[] splits = subExpression.split("-"); +if (splits.length != 2) { +return Optional.empty(); +} +try { +DailyCalendar daily = new DailyCalendar(splits[0], splits[1]); +daily.setInvertTimeRange(true); +return Optional.of(daily); +} catch (Exception e) { +return Optional.empty(); +} +} + +static Optional interpretAsCron(String subExpression) { +try { +return Optional.of(new CronCalendar(subExpression)); +} catch (Exception e) { +return Optional.empty(); Review Comment: Incorrect expression config will first be validated in `DefaultValidator` and report exception there, I've added extra tests in `DefaultValidatorTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33795] Add a new config to forbid autoscale execution in the configured excluded periods [flink-kubernetes-operator]
flashJd commented on code in PR #728: URL: https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1429452589 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java: ## @@ -94,4 +99,108 @@ public static boolean excludeVerticesFromScaling( conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new ArrayList<>(excludedIds)); return anyAdded; } + +/** Quartz doesn't have the invertTimeRange flag so rewrite this method. */ +static boolean isTimeIncluded(CronCalendar cron, long timeInMillis) { Review Comment: Removed to `CalendarUtils` and corresponding tests removed to `CalendarUtilsTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]
X-czh commented on PR #23447: URL: https://github.com/apache/flink/pull/23447#issuecomment-1859486000 @wanglijie95 Kindly remind~ Could you help take a look when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33868) Add checkpoint metrics: the latency to write the file
Jufang He created FLINK-33868: - Summary: Add checkpoint metrics: the latency to write the file Key: FLINK-33868 URL: https://issues.apache.org/jira/browse/FLINK-33868 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33869) Add checkpoint metrics: the latency to close the file
Jufang He created FLINK-33869: - Summary: Add checkpoint metrics: the latency to close the file Key: FLINK-33869 URL: https://issues.apache.org/jira/browse/FLINK-33869 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33867) Add checkpoint metrics: the rate of file write
Jufang He created FLINK-33867: - Summary: Add checkpoint metrics: the rate of file write Key: FLINK-33867 URL: https://issues.apache.org/jira/browse/FLINK-33867 Project: Flink Issue Type: Sub-task Reporter: Jufang He -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making
[ https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jufang He updated FLINK-33856: -- Description: When Flink makes a checkpoint, the interaction performance with the external file system has a great impact on the overall time-consuming. Therefore, it is easy to observe the bottleneck point by adding performance indicators when the task interacts with the external file storage system. These include: the rate of file write , the latency to write the file, the latency to close the file. In flink side add the above metrics has the following advantages: convenient statistical different task E2E time-consuming; do not need to distinguish the type of external storage system, can be unified in the FsCheckpointStreamFactory. was: When Flink makes a Checkpoint, the interaction performance with the external file system has a great impact on the overall time-consuming of the checkpoint. Therefore, it is easy to observe the bottleneck point by adding performance indicators when the task interacts with the external file storage system. These include: the rate of file write , the latency to write the file, the latency to close the file. In flink side add the above metrics has the following advantages: convenient statistical different task E2E time-consuming; do not need to distinguish the type of external storage system, can be unified in the FsCheckpointStreamFactory. > Add metrics to monitor the interaction performance between task and external > storage system in the process of checkpoint making > --- > > Key: FLINK-33856 > URL: https://issues.apache.org/jira/browse/FLINK-33856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.18.0 >Reporter: Jufang He >Priority: Major > > When Flink makes a checkpoint, the interaction performance with the external > file system has a great impact on the overall time-consuming. Therefore, it > is easy to observe the bottleneck point by adding performance indicators when > the task interacts with the external file storage system. These include: the > rate of file write , the latency to write the file, the latency to close the > file. > In flink side add the above metrics has the following advantages: convenient > statistical different task E2E time-consuming; do not need to distinguish the > type of external storage system, can be unified in the > FsCheckpointStreamFactory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-30960] Fix a corner case oom error [flink-connector-jdbc]
bs352 commented on code in PR #77: URL: https://github.com/apache/flink-connector-jdbc/pull/77#discussion_r1429434411 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java: ## @@ -153,6 +153,16 @@ public void open(int taskNumber, int numTasks) throws IOException { if (!closed) { try { flush(); +} catch (FlushingRuntimeException e) { +/* + * We ignore this FlushingRuntimeException, as it is + * only thrown when flushingException was assigned to, + * in a former run of this scheduler thread, in the next + * catch clause. In that case, we already have + * flushException cached, waiting for the next task + * manager thread's flush call which would throw a new + * FlushingRuntimeException causing job failure. Review Comment: I see, the close method calls flush as well. In this case, the scheduler is already shutdown before calling flush, so it will never loop. It won't hurt if we throw the new FlushingRuntimeException here instead, as it will propagate up to be handled by TaskManager the same way as a regular RuntimeException. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Myasuka commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429398494 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for list state benchmark testing. */ +public class TtlListStateBenchmark extends TtlStateBenchmarkBase { +private final String STATE_NAME = "listState"; +private final ListStateDescriptor STATE_DESC = +configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); +private ListState listState; +private List dummyLists; + +public static void main(String[] args) throws RunnerException { +Options opt = +new OptionsBuilder() +.verbosity(VerboseMode.NORMAL) +.include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*") +.build(); + +new Runner(opt).run(); +} + +@Setup +public void setUp() throws Exception { +keyedStateBackend = createKeyedStateBackend(); +listState = getListState(keyedStateBackend, STATE_DESC); +dummyLists = new ArrayList<>(listValueCount); +for (int i = 0; i < listValueCount; ++i) { +dummyLists.add(random.nextLong()); +} +keyIndex = new AtomicInteger(); +} + +@Setup(Level.Iteration) +public void setUpPerIteration() throws Exception { +for (int i = 0; i < setupKeyCount; ++i) { +keyedStateBackend.setCurrentKey((long) i); +listState.add(random.nextLong()); +} +// make sure only one sst file left, so all get invocation will access this single file, +// to prevent the spike caused by different key distribution in multiple sst files, +// the more access to the older sst file, the lower throughput will be. +if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { Review Comment: I'm not sure whether such refactor could impact the performance results significantly, could you please provide some data if possible? If so, I think we can refactor the tests with FLINK-33864. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
liyubin117 commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1429393701 ## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java: ## @@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer( } private void validateFieldOptions(String name, DataType type, ReadableConfig options) { -ConfigOption lenOption = +ConfigOption varLenOption = key(DataGenConnectorOptionsUtil.FIELDS + "." + name + "." + DataGenConnectorOptionsUtil.VAR_LEN) .booleanType() .defaultValue(false); -options.getOptional(lenOption) +options.getOptional(varLenOption) Review Comment: sorry to misunderstand. I invoke this to filter the table that contains ` 'fields.#.var-len' = 'true' `, then we could use `ifPresent` check if it is sufficient to use such option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]
flinkbot commented on PR #23943: URL: https://github.com/apache/flink/pull/23943#issuecomment-1859429375 ## CI report: * ffc32e900cfb448b602f9134d6ea3e247564d722 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]
liuyongvs opened a new pull request, #23943: URL: https://github.com/apache/flink/pull/23943 ## What is the purpose of the change *Support CharType for the MaxAggFunction.** ## Brief change log * Support CharType for the MaxAggFunction like MinAggFunction* ## Verifying this change * original unit test * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33719) Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream
[ https://issues.apache.org/jira/browse/FLINK-33719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797988#comment-17797988 ] Jane Chan commented on FLINK-33719: --- [~jackylau] assigned to you. > Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream > -- > > Key: FLINK-33719 > URL: https://issues.apache.org/jira/browse/FLINK-33719 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-33719) Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream
[ https://issues.apache.org/jira/browse/FLINK-33719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jane Chan reassigned FLINK-33719: - Assignee: Jacky Lau > Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream > -- > > Key: FLINK-33719 > URL: https://issues.apache.org/jira/browse/FLINK-33719 > Project: Flink > Issue Type: Sub-task >Reporter: Jane Chan >Assignee: Jacky Lau >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32721) agg max/min supports char type
[ https://issues.apache.org/jira/browse/FLINK-32721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797987#comment-17797987 ] Jacky Lau commented on FLINK-32721: --- hi [~lsy] sorry for late response, i will modify it > agg max/min supports char type > -- > > Key: FLINK-32721 > URL: https://issues.apache.org/jira/browse/FLINK-32721 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: Jacky Lau >Assignee: Jacky Lau >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.19.0 > > > {code:java} > // flink > Flink SQL> CREATE TABLE Orders ( > > name char(10), > > price DECIMAL(32,2), > > buyer ROW, > > order_time TIMESTAMP(3) > > ) WITH ( > > 'connector' = 'datagen' > > ); > [INFO] Execute statement succeed. > Flink SQL> select max(name) from Orders; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Max aggregate function does not > support type: ''CHAR''. > Please re-check the data type. {code} > {code:java} > // mysql > CREATE TABLE IF NOT EXISTS `docs` ( > `id` int(6) unsigned NOT NULL, > `rev` int(3) unsigned NOT NULL, > `content` char(200) NOT NULL, > PRIMARY KEY (`id`,`rev`) > ) DEFAULT CHARSET=utf8; > INSERT INTO `docs` (`id`, `rev`, `content`) VALUES > ('1', '1', 'The earth is flat'), > ('2', '1', 'One hundred angels can dance on the head of a pin'), > ('1', '2', 'The earth is flat and rests on a bull\'s horn'), > ('1', '3', 'The earth is like a ball.'); > select max(content) from docs; > // result > |max(content)| > The earth is like a ball.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]
LadyForest commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1429375718 ## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java: ## @@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer( } private void validateFieldOptions(String name, DataType type, ReadableConfig options) { -ConfigOption lenOption = +ConfigOption varLenOption = key(DataGenConnectorOptionsUtil.FIELDS + "." + name + "." + DataGenConnectorOptionsUtil.VAR_LEN) .booleanType() .defaultValue(false); -options.getOptional(lenOption) +options.getOptional(varLenOption) Review Comment: > the prior name is not clear enough, so I use a new one instead. I mean L#155 `filter(option -> option)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xuyangzhong commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1429367900 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: IIUC, for retract streams, if -U comes before +U, +U might also be eliminated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-33866) KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base
Kurt Ostfeld created FLINK-33866: Summary: KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base Key: FLINK-33866 URL: https://issues.apache.org/jira/browse/FLINK-33866 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: kafka-3.0.2 Reporter: Kurt Ostfeld I have a Flink project that has code like: ``` KafkaSink.builder().setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) ``` This worked with flink-connector-kafka 3.0.1 as well as past versions of Flink. This fails to compile with flink-connector-kafka 3.0.2 because that release changed flink-connector-base to a provided dependency so the reference to the DeliveryGuarantee class becomes a compiler error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33764) Incorporate GC / Heap metrics in autoscaler decisions
[ https://issues.apache.org/jira/browse/FLINK-33764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-33764. -- Resolution: Fixed merged to main f6adb400e1c87f06faec948379c264eebba71166 > Incorporate GC / Heap metrics in autoscaler decisions > - > > Key: FLINK-33764 > URL: https://issues.apache.org/jira/browse/FLINK-33764 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > > The autoscaler currently doesn't use any GC/HEAP metrics as part of the > scaling decisions. > While the long term goal may be to support vertical scaling (increasing TM > sizes) currently this is out of scope for the autoscaler. > However it is very important to detect cases where the throughput of certain > vertices or the entire pipeline is critically affected by long GC pauses. In > these cases the current autoscaler logic would wrongly assume a low true > processing rate and scale the pipeline too high, ramping up costs and causing > further issues. > Using the improved GC metrics introduced in > https://issues.apache.org/jira/browse/FLINK-33318 we should measure the GC > pauses and simply block scaling decisions if the pipeline spends too much > time garbage collecting and notify the user about the required action to > increase memory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora merged PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Zakelly commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429208690 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for list state benchmark testing. */ +public class TtlListStateBenchmark extends TtlStateBenchmarkBase { +private final String STATE_NAME = "listState"; +private final ListStateDescriptor STATE_DESC = +configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); +private ListState listState; +private List dummyLists; + +public static void main(String[] args) throws RunnerException { +Options opt = +new OptionsBuilder() +.verbosity(VerboseMode.NORMAL) +.include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*") +.build(); + +new Runner(opt).run(); +} + +@Setup +public void setUp() throws Exception { +keyedStateBackend = createKeyedStateBackend(); +listState = getListState(keyedStateBackend, STATE_DESC); +dummyLists = new ArrayList<>(listValueCount); +for (int i = 0; i < listValueCount; ++i) { +dummyLists.add(random.nextLong()); +} +keyIndex = new AtomicInteger(); +} + +@Setup(Level.Iteration) +public void setUpPerIteration() throws Exception { +for (int i = 0; i < setupKeyCount; ++i) { +keyedStateBackend.setCurrentKey((long) i); +listState.add(random.nextLong()); +} +// make sure only one sst file left, so all get invocation will access this single file, +// to prevent the spike caused by different key distribution in multiple sst files, +// the more access to the older sst file, the lower throughput will be. +if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { Review Comment: Actually, in order to prevent small changes from affecting the existing test results, I did not use class abstraction purposely. I suggest we introduce a refactor to the existing tests along with the [FLINK-33864](https://issues.apache.org/jira/browse/FLINK-33864), since it can greatly affect test results. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Zakelly commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429207983 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java: ## @@ -0,0 +1,48 @@ +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Param; + +import java.util.concurrent.TimeUnit; + +/** The base class for state tests with ttl. */ +public class TtlStateBenchmarkBase extends StateBenchmarkBase { + +/** The expired time of ttl. */ +public enum ExpiredTimeOptions { + +/** 5 seconds. */ +Seconds5(5000), Review Comment: Considering that the warm-up will take `10` seconds, `5` seconds is enough for the test to enter the key expiration phase, and it is not too short. I will give a comparison of the results under different configurations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]
reta commented on PR #38: URL: https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-1859212485 > this is what I tried to do and this is how it looks like with this PR Yep. it looks reasonable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]
Myasuka commented on code in PR #83: URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429199281 ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java: ## @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState; +import static org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount; +import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount; + +/** Implementation for list state benchmark testing. */ +public class TtlListStateBenchmark extends TtlStateBenchmarkBase { +private final String STATE_NAME = "listState"; +private final ListStateDescriptor STATE_DESC = +configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class)); +private ListState listState; +private List dummyLists; + +public static void main(String[] args) throws RunnerException { +Options opt = +new OptionsBuilder() +.verbosity(VerboseMode.NORMAL) +.include(".*" + TtlListStateBenchmark.class.getCanonicalName() + ".*") +.build(); + +new Runner(opt).run(); +} + +@Setup +public void setUp() throws Exception { +keyedStateBackend = createKeyedStateBackend(); +listState = getListState(keyedStateBackend, STATE_DESC); +dummyLists = new ArrayList<>(listValueCount); +for (int i = 0; i < listValueCount; ++i) { +dummyLists.add(random.nextLong()); +} +keyIndex = new AtomicInteger(); +} + +@Setup(Level.Iteration) +public void setUpPerIteration() throws Exception { +for (int i = 0; i < setupKeyCount; ++i) { +keyedStateBackend.setCurrentKey((long) i); +listState.add(random.nextLong()); +} +// make sure only one sst file left, so all get invocation will access this single file, +// to prevent the spike caused by different key distribution in multiple sst files, +// the more access to the older sst file, the lower throughput will be. +if (keyedStateBackend instanceof RocksDBKeyedStateBackend) { Review Comment: I noticed that there existed so much duplicated code here, can how reduce this code via Generics or more abstract class? ## src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java: ## @@ -0,0 +1,48 @@ +package org.apache.flink.state.benchmark.ttl; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.state.benchmark.StateBenchmarkBase; +import org.openjdk.jmh.annotations.Param; + +import java.util.concurrent.TimeUnit; + +/** The base class for state tests with ttl. */ +public class TtlStateBenchmarkBase extends StateBenchmarkBase { + +/** The expired time of ttl. */ +public enum ExpiredTimeOptions { + +/** 5 seconds. */ +Seconds5(5000), Review Comment:
Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]
Myasuka commented on code in PR #23820: URL: https://github.com/apache/flink/pull/23820#discussion_r1429195464 ## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + +private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); +private static final Configuration configs = new Configuration(); +private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; +private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; +private static final String resourceID = "TestJobManager"; +private static final long profilingDuration = 3L; +private static final int historySizeLimit = 3; + +private ProfilingService profilingService; + +@BeforeAll +static void beforeAll() { +configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); +} + +@BeforeEach +void setUp(@TempDir Path tempDir) { +configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); +profilingService = ProfilingService.getInstance(configs); +verifyConfigsWorks(profilingService, tempDir); Review Comment: The `setUp` method run `@BeforeEach` should not include any test behavior. If you want to add a test to verify the configs, please add a separate unit test to verify it. For the `tmpeDir` problem, you can introduce `@TempDir Path tempDir` as a private field in the `ProfilingServiceTest` class to avoid such a varible in the `setUp` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]
Myasuka commented on code in PR #23820: URL: https://github.com/apache/flink/pull/23820#discussion_r1429195243 ## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + +private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); +private static final Configuration configs = new Configuration(); +private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; +private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; +private static final String resourceID = "TestJobManager"; +private static final long profilingDuration = 3L; +private static final int historySizeLimit = 3; + +private ProfilingService profilingService; + +@BeforeAll +static void beforeAll() { +configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); +} + +@BeforeEach +void setUp(@TempDir Path tempDir) { +configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); +profilingService = ProfilingService.getInstance(configs); +verifyConfigsWorks(profilingService, tempDir); +} + +@AfterEach +void tearDown() throws IOException { +profilingService.close(); +} + +@Test +public void testSingleInstance() throws IOException { +ProfilingService instance = ProfilingService.getInstance(configs); +Assertions.assertEquals(profilingService, instance); +instance.close(); +} + +@Test +void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { +ProfilingInfo profilingInfo = +profilingService +.requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) +.get(); +Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); +try { +profilingService +.requestProfiling( +resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) +.get(); +Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); +} catch (Exception e) { +Assertions.assertTrue(e.getCause() instanceof IllegalStateException); +} +} + +@Test +@Timeout(value = 1, unit = TimeUnit.MINUTES) +public void testAllProfilingMode() throws ExecutionException, InterruptedException { +for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { +ProfilingInfo profilingInfo = +profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); +if (isNoPermissionOrAllocateSymbol(profilingInfo)) { +LOG.warn( +"Ignoring failed profiling instance in {} mode, which caused by no permission.", +profilingInfo.getProfilingMode()); +continue; +} +Assertions.assertEquals( +ProfilingInfo.ProfilingStatus.RUNNING, +profilingInfo.getStatus(), +String.format( +"Submitting profiling request should be succeed or no permission, but got
[jira] [Updated] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration
[ https://issues.apache.org/jira/browse/FLINK-33865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-33865: Description: exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration. h2. Reason: when exponential-delay.attempts-before-reset-backoff is set by job Configuration instead of cluster configuration. ExecutionConfig#configure will call RestartStrategies#parseConfiguration to create the ExponentialDelayRestartStrategyConfiguration. And then RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will create the ExponentialDelayRestartBackoffTimeStrategyFactory by the ExponentialDelayRestartStrategyConfiguration. Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, so ExponentialDelayRestartStrategyConfiguration doesn't support exponential-delay.attempts-before-reset-backoff. So if we set exponential-delay.attempts-before-reset-backoff at job level, it won't be supported. h2. Solution If we use the ExponentialDelayRestartStrategyConfiguration to save restartStrategy related options in the ExecutionConfig, all new options are set at job level will be missed. So we can use the Configuration to save the restartStrategy options inside of ExecutionConfig. !image-2023-12-17-17-56-59-138.png! was: exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration. Reason: when exponential-delay.attempts-before-reset-backoff is set by job Configuration instead of cluster configuration. ExecutionConfig#configure will call RestartStrategies#parseConfiguration to create the RestartStrategyConfiguration. And then RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will create the ExponentialDelayRestartBackoffTimeStrategyFactory by the RestartStrategyConfiguration. Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, so it doesn't support exponential-delay.attempts-before-reset-backoff. I have a misunderstand during FLINK-32895, I thought the RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory will create ExponentialDelayRestartBackoffTimeStrategyFactory by the clusterConfiguration. !image-2023-12-17-17-56-59-138.png! > exponential-delay.attempts-before-reset-backoff doesn't work when it's set in > Job Configuration > --- > > Key: FLINK-33865 > URL: https://issues.apache.org/jira/browse/FLINK-33865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2023-12-17-17-56-59-138.png > > > exponential-delay.attempts-before-reset-backoff doesn't work when it's set in > Job Configuration. > h2. Reason: > when exponential-delay.attempts-before-reset-backoff is set by job > Configuration instead of cluster configuration. > ExecutionConfig#configure will call RestartStrategies#parseConfiguration to > create the ExponentialDelayRestartStrategyConfiguration. And then > RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will > create the ExponentialDelayRestartBackoffTimeStrategyFactory by the > ExponentialDelayRestartStrategyConfiguration. > Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, > so ExponentialDelayRestartStrategyConfiguration doesn't support > exponential-delay.attempts-before-reset-backoff. So if we set > exponential-delay.attempts-before-reset-backoff at job level, it won't be > supported. > h2. Solution > If we use the ExponentialDelayRestartStrategyConfiguration to save > restartStrategy related options in the ExecutionConfig, all new options are > set at job level will be missed. > So we can use the Configuration to save the restartStrategy options inside of > ExecutionConfig. > !image-2023-12-17-17-56-59-138.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]
flinkbot commented on PR #23942: URL: https://github.com/apache/flink/pull/23942#issuecomment-1859194965 ## CI report: * fce08fa4150978ab8c7ea743ac17cacc6d565d26 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration
[ https://issues.apache.org/jira/browse/FLINK-33865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-33865: --- Labels: pull-request-available (was: ) > exponential-delay.attempts-before-reset-backoff doesn't work when it's set in > Job Configuration > --- > > Key: FLINK-33865 > URL: https://issues.apache.org/jira/browse/FLINK-33865 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Attachments: image-2023-12-17-17-56-59-138.png > > > exponential-delay.attempts-before-reset-backoff doesn't work when it's set in > Job Configuration. > Reason: when exponential-delay.attempts-before-reset-backoff is set by job > Configuration instead of cluster configuration. > ExecutionConfig#configure will call RestartStrategies#parseConfiguration to > create the RestartStrategyConfiguration. And then > RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will > create the ExponentialDelayRestartBackoffTimeStrategyFactory by the > RestartStrategyConfiguration. > Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, > so it doesn't support exponential-delay.attempts-before-reset-backoff. > I have a misunderstand during FLINK-32895, I thought the > RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory > will create ExponentialDelayRestartBackoffTimeStrategyFactory by the > clusterConfiguration. > !image-2023-12-17-17-56-59-138.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]
1996fanrui opened a new pull request, #23942: URL: https://github.com/apache/flink/pull/23942 Draft PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]
snuyanzin commented on PR #38: URL: https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-1859140916 at least we could build/test with what it is allowed this is what I tried to do and this is how it looks like with this PR | Opensearch| jdk8 | jdk11 | jdk17 (Flink1.18+) | jdk21 (Flink 1.19+) | |---|---|---|---|---| | v1 | ✅ |✅ | ✅ |✅ | | v2 | no since OS v2 baseline is jdk11 | ✅ |✅ |✅ | -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-33862) Flink Unit Test Failures on 1.18.0
[ https://issues.apache.org/jira/browse/FLINK-33862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797908#comment-17797908 ] Martijn Visser commented on FLINK-33862: This isn't a command that you would use to build Flink normally. > Flink Unit Test Failures on 1.18.0 > -- > > Key: FLINK-33862 > URL: https://issues.apache.org/jira/browse/FLINK-33862 > Project: Flink > Issue Type: Bug >Affects Versions: 1.18.0, 1.19.0 >Reporter: Prabhu Joseph >Priority: Major > > Flink Unit Test Failures on 1.18.0. There are 100+ unit test cases failing > due to below common issues. > *Issue 1* > {code:java} > ./mvnw -DfailIfNoTests=false -Dmaven.test.failure.ignore=true > -Dtest=ExecutionPlanAfterExecutionTest test > [INFO] Running org.apache.flink.client.program.ExecutionPlanAfterExecutionTest > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47) > at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310) > at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307) > at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234) > at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) > at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629) > at > org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34) > at > org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33) > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73) > at > org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110) > at >
[jira] [Created] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration
Rui Fan created FLINK-33865: --- Summary: exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration Key: FLINK-33865 URL: https://issues.apache.org/jira/browse/FLINK-33865 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Rui Fan Assignee: Rui Fan Attachments: image-2023-12-17-17-56-59-138.png exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration. Reason: when exponential-delay.attempts-before-reset-backoff is set by job Configuration instead of cluster configuration. ExecutionConfig#configure will call RestartStrategies#parseConfiguration to create the RestartStrategyConfiguration. And then RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will create the ExponentialDelayRestartBackoffTimeStrategyFactory by the RestartStrategyConfiguration. Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, so it doesn't support exponential-delay.attempts-before-reset-backoff. I have a misunderstand during FLINK-32895, I thought the RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory will create ExponentialDelayRestartBackoffTimeStrategyFactory by the clusterConfiguration. !image-2023-12-17-17-56-59-138.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Update content.zh/docs/dev/table/overview.md [flink]
flinkbot commented on PR #23941: URL: https://github.com/apache/flink/pull/23941#issuecomment-1859082438 ## CI report: * 797c7d77b1b08af37e5e8ac3609cfee16f61cf2a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update content.zh/docs/dev/table/overview.md [flink]
shalk opened a new pull request, #23941: URL: https://github.com/apache/flink/pull/23941 translate content.zh/docs/dev/table/overview.md ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org