Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #23944:
URL: https://github.com/apache/flink/pull/23944#discussion_r1429620891


##
flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java:
##
@@ -40,8 +43,16 @@ public static JobGraph getJobGraph(
 FlinkPipelineTranslator pipelineTranslator =
 getPipelineTranslator(userClassloader, pipeline);
 
-return pipelineTranslator.translateToJobGraph(
-pipeline, optimizerConfiguration, defaultParallelism);
+JobGraph jobGraph =
+pipelineTranslator.translateToJobGraph(
+pipeline, optimizerConfiguration, defaultParallelism);
+
+Map parallelismOverrides =
+
optimizerConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES);
+jobGraph.getJobConfiguration()
+.set(PipelineOptions.PARALLELISM_OVERRIDES, 
parallelismOverrides);
+

Review Comment:
   Thanks for the suggestion @JunRuiLee. I agree it better distinguishes 
between an unset configuration and a configured but empty map. I have updated 
the PR according to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33704][BP 1.18][Filesytems] Update GCS filesystems to latest available versions [flink]

2023-12-17 Thread via GitHub


czchen commented on PR #23935:
URL: https://github.com/apache/flink/pull/23935#issuecomment-1859660606

   Shall be https://github.com/apache/flink/pull/23837


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33870) Split the HighAvailabilityServices into LeaderServices and PersistentServices

2023-12-17 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-33870:
--

 Summary: Split the HighAvailabilityServices into LeaderServices 
and PersistentServices
 Key: FLINK-33870
 URL: https://issues.apache.org/jira/browse/FLINK-33870
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yangze Guo
Assignee: Yangze Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32667) Enable users to independently adjust the high availability strategies related to jobs through configuration

2023-12-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32667:
---
Description: 
In OLAP scenarios, we only require the leader election services for the 
Dispatcher / ResourceManager and RestEndpoint in the JobManager process. Leader 
election services and persistent services are redundant for jobs and may impact 
cluster performance.
To generate HA services suitable for OLAP scenarios, we introduce the 
high-availability.enable-job-recovery parameter. When users enable HA with 
Kubernetes or ZooKeeper and set this option to false, we will select the 
combination of DefaultLeaderServices and EmbeddedPersistentServices. 
Additionally, we will set the JobMaster's LeaderElectionService and 
LeaderRetrieverService to the Standalone version.

  was:When a flink session cluster use zk or k8s high availability service, it 
will store jobs in zk or ConfigMap. When we submit flink olap jobs to the 
session cluster, they always turn off restart strategy. These jobs with 
no-restart-strategy should not be stored in zk or ConfigMap in k8s


> Enable users to independently adjust the high availability strategies related 
> to jobs through configuration
> ---
>
> Key: FLINK-32667
> URL: https://issues.apache.org/jira/browse/FLINK-32667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> In OLAP scenarios, we only require the leader election services for the 
> Dispatcher / ResourceManager and RestEndpoint in the JobManager process. 
> Leader election services and persistent services are redundant for jobs and 
> may impact cluster performance.
> To generate HA services suitable for OLAP scenarios, we introduce the 
> high-availability.enable-job-recovery parameter. When users enable HA with 
> Kubernetes or ZooKeeper and set this option to false, we will select the 
> combination of DefaultLeaderServices and EmbeddedPersistentServices. 
> Additionally, we will set the JobMaster's LeaderElectionService and 
> LeaderRetrieverService to the Standalone version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-17 Thread Dong Lin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798026#comment-17798026
 ] 

Dong Lin commented on FLINK-25509:
--

[~ruanhang1993] I think we need to update flink-kafka-connector's master branch 
to depend on Flink 1.18.0 (it currectnly depends on Flink 1.17.0) before 
putting in your feature. And then a new branch/version of flink-kafka-connector 
will be created in its upcoming release.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32667) Enable users to independently adjust the high availability strategies related to jobs through configuration

2023-12-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32667:
---
Summary: Enable users to independently adjust the high availability 
strategies related to jobs through configuration  (was: Use standalone store 
and embedding writer for jobs with no-restart-strategy in session cluster)

> Enable users to independently adjust the high availability strategies related 
> to jobs through configuration
> ---
>
> Key: FLINK-32667
> URL: https://issues.apache.org/jira/browse/FLINK-32667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> When a flink session cluster use zk or k8s high availability service, it will 
> store jobs in zk or ConfigMap. When we submit flink olap jobs to the session 
> cluster, they always turn off restart strategy. These jobs with 
> no-restart-strategy should not be stored in zk or ConfigMap in k8s



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


JunRuiLee commented on code in PR #23944:
URL: https://github.com/apache/flink/pull/23944#discussion_r1429569157


##
flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java:
##
@@ -40,8 +43,16 @@ public static JobGraph getJobGraph(
 FlinkPipelineTranslator pipelineTranslator =
 getPipelineTranslator(userClassloader, pipeline);
 
-return pipelineTranslator.translateToJobGraph(
-pipeline, optimizerConfiguration, defaultParallelism);
+JobGraph jobGraph =
+pipelineTranslator.translateToJobGraph(
+pipeline, optimizerConfiguration, defaultParallelism);
+
+Map parallelismOverrides =
+
optimizerConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES);
+jobGraph.getJobConfiguration()
+.set(PipelineOptions.PARALLELISM_OVERRIDES, 
parallelismOverrides);
+

Review Comment:
   I suggest use `getOptional` instead of `get` for fetching 
`PARALLELISM_OVERRIDES` to ensure that the 'jobConfiguration' only contains 
which is explicitly set by users like below.
   
   
`optimizerConfiguration.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).ifPresent(map
 -> jobGraph.getJobConfiguration().set(PipelineOptions.PARALLELISM_OVERRIDES, 
map));`
   
   
   Although the current jobConfiguration does not yet include all job-level 
configurations, in the long term, this jobConfiguration field should contain 
and only contain the job-level configuration items explicitly configured by the 
user.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33588) Fix Flink Checkpointing Statistics Bug

2023-12-17 Thread Tongtong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798025#comment-17798025
 ] 

Tongtong Zhu edited comment on FLINK-33588 at 12/18/23 6:52 AM:


[~jingge] I have made the necessary modifications as per your request. Can this 
PR be released in version 1.18.1?

https://github.com/apache/flink/pull/23931


was (Author: JIRAUSER301126):
[~jingge] I have made the necessary modifications as per your request. Can this 
PR be released in version 1.18.1?

> Fix Flink Checkpointing Statistics Bug
> --
>
> Key: FLINK-33588
> URL: https://issues.apache.org/jira/browse/FLINK-33588
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1
>Reporter: Tongtong Zhu
>Assignee: Tongtong Zhu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.1
>
> Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, 
> image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, 
> newCommit-FLINK-33688.patch
>
>
> When the Flink task is first started, the checkpoint data is null due to the 
> lack of data, and Percentile throws a null pointer exception when calculating 
> the percentage. After multiple tests, I found that it is necessary to set an 
> initial value for the statistical data value of the checkpoint when the 
> checkpoint data is null (i.e. at the beginning of the task) to solve this 
> problem.
> The following is an abnormal description of the bug:
> 2023-09-13 15:02:54,608 ERROR 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
>  [] - Unhandled exception.
> org.apache.commons.math3.exception.NullArgumentException: input array
>     at 
> org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) 
> [?:1.8.0_151]
>     at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_151]
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_151]
>     at 
> 

[jira] [Commented] (FLINK-33588) Fix Flink Checkpointing Statistics Bug

2023-12-17 Thread Tongtong Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798025#comment-17798025
 ] 

Tongtong Zhu commented on FLINK-33588:
--

[~jingge] I have made the necessary modifications as per your request. Can this 
PR be released in version 1.18.1?

> Fix Flink Checkpointing Statistics Bug
> --
>
> Key: FLINK-33588
> URL: https://issues.apache.org/jira/browse/FLINK-33588
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.14.5, 1.16.0, 1.17.0, 1.15.2, 1.14.6, 1.18.0, 1.17.1
>Reporter: Tongtong Zhu
>Assignee: Tongtong Zhu
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.1
>
> Attachments: FLINK-33588.patch, image-2023-12-11-17-35-23-391.png, 
> image-2023-12-13-11-35-43-780.png, image-2023-12-15-13-59-28-201.png, 
> newCommit-FLINK-33688.patch
>
>
> When the Flink task is first started, the checkpoint data is null due to the 
> lack of data, and Percentile throws a null pointer exception when calculating 
> the percentage. After multiple tests, I found that it is necessary to set an 
> initial value for the statistical data value of the checkpoint when the 
> checkpoint data is null (i.e. at the beginning of the task) to solve this 
> problem.
> The following is an abnormal description of the bug:
> 2023-09-13 15:02:54,608 ERROR 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
>  [] - Unhandled exception.
> org.apache.commons.math3.exception.NullArgumentException: input array
>     at 
> org.apache.commons.math3.util.MathArrays.verifyValues(MathArrays.java:1650) 
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.AbstractUnivariateStatistic.test(AbstractUnivariateStatistic.java:158)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:272)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.commons.math3.stat.descriptive.rank.Percentile.evaluate(Percentile.java:241)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics$CommonMetricsSnapshot.getPercentile(DescriptiveStatisticsHistogramStatistics.java:159)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogramStatistics.getQuantile(DescriptiveStatisticsHistogramStatistics.java:53)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.checkpoint.StatsSummarySnapshot.getQuantile(StatsSummarySnapshot.java:108)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.messages.checkpoints.StatsSummaryDto.valueOf(StatsSummaryDto.java:81)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.createCheckpointingStatistics(CheckpointingStatisticsHandler.java:129)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:84)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:58)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.AbstractAccessExecutionGraphHandler.handleRequest(AbstractAccessExecutionGraphHandler.java:68)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
>  ~[flink-dist_2.12-1.14.5.jar:1.14.5]
>     at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) 
> [?:1.8.0_151]
>     at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_151]
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_151]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  [?:1.8.0_151]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_151]
>     at 
> 

[jira] [Updated] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster

2023-12-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32667:
---
Parent: FLINK-33852
Issue Type: Sub-task  (was: Improvement)

> Use standalone store and embedding writer for jobs with no-restart-strategy 
> in session cluster
> --
>
> Key: FLINK-32667
> URL: https://issues.apache.org/jira/browse/FLINK-32667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> When a flink session cluster use zk or k8s high availability service, it will 
> store jobs in zk or ConfigMap. When we submit flink olap jobs to the session 
> cluster, they always turn off restart strategy. These jobs with 
> no-restart-strategy should not be stored in zk or ConfigMap in k8s



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32667) Use standalone store and embedding writer for jobs with no-restart-strategy in session cluster

2023-12-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32667:
---
Parent: (was: FLINK-25318)
Issue Type: Improvement  (was: Sub-task)

> Use standalone store and embedding writer for jobs with no-restart-strategy 
> in session cluster
> --
>
> Key: FLINK-32667
> URL: https://issues.apache.org/jira/browse/FLINK-32667
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Assignee: Yangze Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> When a flink session cluster use zk or k8s high availability service, it will 
> store jobs in zk or ConfigMap. When we submit flink olap jobs to the session 
> cluster, they always turn off restart strategy. These jobs with 
> no-restart-strategy should not be stored in zk or ConfigMap in k8s



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]

2023-12-17 Thread via GitHub


X-czh commented on code in PR #23447:
URL: https://github.com/apache/flink/pull/23447#discussion_r1429559942


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -126,9 +140,14 @@ public synchronized void 
updateCurrentExecutionAttempts(Collection j
 
taskMetricStore.getSubtaskMetricStore(
 
subtaskIndex))
 .ifPresent(
-subtaskMetricStore ->
-
subtaskMetricStore.retainAttempts(
-
attempts.getCurrentAttempts()));
+subtaskMetricStore -> {
+
subtaskMetricStore.retainAttempts(

Review Comment:
   Good catch! But I'm wondering if this is a good approach to do so here. It 
complicates the code and makes maintenance more difficult.  Since the 
duplication was introduced to overcome the issue that WebInterface task metric 
queries currently do not account for subtasks, how about leaving it unremoved 
here for now and create a new JIRA on updating the WebInterface to account for 
subtasks? I'd be willing to help with that as well. cc @JunRuiLee @wanglijie95 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


yunfengzhou-hub commented on PR #23944:
URL: https://github.com/apache/flink/pull/23944#issuecomment-1859639546

   Thanks for the comments @reswqa. I have updated the PR according to the 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


yunfengzhou-hub commented on PR #23944:
URL: https://github.com/apache/flink/pull/23944#issuecomment-1859639249

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


yunfengzhou-hub commented on code in PR #23944:
URL: https://github.com/apache/flink/pull/23944#discussion_r1429556103


##
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java:
##
@@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor 
{
 
 private final boolean useLargeRecordHandler;
 
+private final Map parallelismOverrides;

Review Comment:
   Thanks for pointing this out. It seems that neither `JobGraphGenerator` nor 
`StreamingJobGraphGenerator` has applied this logic. I'll move this to 
FlinkPipelineTranslationUtil.getJobGraph, which helps to apply this logic 
regardless of Plan or StreamGraph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-17 Thread via GitHub


liming30 commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1429554092


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+final EmbeddedRocksDBStateBackend rocksDbBackend = new 
EmbeddedRocksDBStateBackend();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());
+
+String relocatedDbLogPrefix =
+RocksDBResourceContainer.resolveRelocatedDbLogPrefix(
+instanceRocksDBPath.getAbsolutePath());
+java.nio.file.Path[] relocatedDbLogs;
+try {
+relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+
assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix));
+// add a rolled log file
+Files.createTempFile(relocatedDBLogDir.toPath(), 
relocatedDbLogPrefix, ".suffix");

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


reswqa commented on code in PR #23944:
URL: https://github.com/apache/flink/pull/23944#discussion_r1429533482


##
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java:
##
@@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor 
{
 
 private final boolean useLargeRecordHandler;
 
+private final Map parallelismOverrides;

Review Comment:
   IIRC, This(`JobGraphGenerator`) should only be used in `DataSet` API. I 
wonder do we really need to support this configuration for it, since this part 
of APIs was deprecated since 1.18.



##
flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java:
##
@@ -153,6 +154,8 @@ public class JobGraphGenerator implements Visitor 
{
 
 private final boolean useLargeRecordHandler;
 
+private final Map parallelismOverrides;

Review Comment:
   Another question is: Have we already done the same support for DataStream 
API(from `StreamJobGraphGenerator`)?



##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##
@@ -1151,6 +1151,43 @@ public void testOverridingJobVertexParallelisms() throws 
Exception {
 
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
 }
 
+@Test
+public void testOverridingJobVertexParallelismsWithJobSubmission() throws 
Exception {
+JobVertex v1 = new JobVertex("v1");
+v1.setParallelism(1);
+JobVertex v2 = new JobVertex("v2");
+v2.setParallelism(2);
+JobVertex v3 = new JobVertex("v3");
+v3.setParallelism(3);
+jobGraph = new JobGraph(jobGraph.getJobID(), "job", v1, v2, v3);
+jobGraph.getJobConfiguration()

Review Comment:
   It seems that config through the `JobGraph` has a higher priority than the 
`Dispatcher` itself, and we may not need to introduce new tests. Just let 
`testOverridingJobVertexParallelisms` test the two configurations at the same 
time, it also can test the priority of both configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32346) JdbcNumericBetweenParametersProvider Sharding key boundaries large storage long integer overflow, use BigDecimal instead Long

2023-12-17 Thread zhilinli (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798019#comment-17798019
 ] 

zhilinli commented on FLINK-32346:
--

Hi  Community gods Can I support this feature? If this feature is supported, it 
will support all integer types, and there will be no limit to the 
length,[~libenchao] [~martijnvisser]  

> JdbcNumericBetweenParametersProvider  Sharding key boundaries large storage 
> long integer overflow, use BigDecimal instead Long
> --
>
> Key: FLINK-32346
> URL: https://issues.apache.org/jira/browse/FLINK-32346
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: zhilinli
>Priority: Major
> Attachments: image-2023-06-15-16-42-16-773.png, 
> image-2023-06-15-16-46-13-188.png
>
>
> *JdbcNumericBetweenParametersProvider.class*
> Sharding key boundaries large storage long integer overflow, use BigDecimal 
> instead Long, so that length types such as DecimalType(30,0) are compatible 
> and LONG cannot be stored Can be assigned to me and I want to complete it  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


flinkbot commented on PR #23944:
URL: https://github.com/apache/flink/pull/23944#issuecomment-1859594720

   
   ## CI report:
   
   * baad6c06ef71afca753ef0c5f8fb2af0a3e8c2ce UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33534) PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission request

2023-12-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33534:
---
Labels: pull-request-available  (was: )

> PipelineOptions.PARALLELISM_OVERRIDES is not picked up from jar submission 
> request
> --
>
> Key: FLINK-33534
> URL: https://issues.apache.org/jira/browse/FLINK-33534
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Configuration, Runtime / REST
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Gyula Fora
>Assignee: Yunfeng Zhou
>Priority: Major
>  Labels: pull-request-available
>
> PARALLELISM_OVERRIDES are currently only applied when they are part of the 
> JobManager / Cluster configuration.
> When this config is provided as part of the JarRunRequestBody it is 
> completely ignored and does not take effect. 
> The main reason is that the dispatcher reads this value from it's own 
> configuration object and does not include the extra configs passed through 
> the rest request.
> This is a blocker for supporting the autoscaler properly for FlinkSessionJobs 
> in the autoscaler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33534][runtime] Support configuring PARALLELISM_OVERRIDES during job submission [flink]

2023-12-17 Thread via GitHub


yunfengzhou-hub opened a new pull request, #23944:
URL: https://github.com/apache/flink/pull/23944

   ## What is the purpose of the change
   
   This pull request supports configuring the PARALLELISM_OVERRIDES 
configuration during the job submission process. Before this PR, Flink has 
currently only allowed configuring this parameter before the Dispatcher, or the 
cluster, starts.
   
   
   ## Brief change log
   
   - Pass PARALLELISM_OVERRIDES from environment to job graph.
   - Make dispatcher use PARALLELISM_OVERRIDES from job graph before submitting 
job.
   
   
   ## Verifying this change
   
   This change is covered by newly added tests in JarRunHandlerParameterTest 
and DispatcherTest.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]

2023-12-17 Thread via GitHub


wanglijie95 commented on code in PR #23447:
URL: https://github.com/apache/flink/pull/23447#discussion_r1429465647


##
flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobDetails.java:
##
@@ -355,16 +356,21 @@ public String toString() {
 
 /**
  * The CurrentAttempts holds the attempt number of the current 
representative execution attempt,
- * and the attempt numbers of all the running attempts.
+ * the attempt numbers of all the running attempts, and whether the 
current execution has
+ * reached terminal state.
  */
 public static final class CurrentAttempts implements Serializable {
 private final int representativeAttempt;
 
 private final Set currentAttempts;
 
-public CurrentAttempts(int representativeAttempt, Set 
currentAttempts) {
+private final boolean terminal;

Review Comment:
   Maybe `terminal` -> `reachTerminalState` or `isTerminalState`



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -57,6 +60,17 @@
 public class MetricStore {
 private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
 
+/**
+ * The set holds the names of the transient metrics which are no longer 
useful after a subtask
+ * reaches terminal state and shall be removed to avoid misleading users.
+ */

Review Comment:
   Maybe add one more line: "Note that there may be other transient metrics, we 
currently only support cleaning these three"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]

2023-12-17 Thread via GitHub


JunRuiLee commented on code in PR #23447:
URL: https://github.com/apache/flink/pull/23447#discussion_r1429512642


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java:
##
@@ -126,9 +140,14 @@ public synchronized void 
updateCurrentExecutionAttempts(Collection j
 
taskMetricStore.getSubtaskMetricStore(
 
subtaskIndex))
 .ifPresent(
-subtaskMetricStore ->
-
subtaskMetricStore.retainAttempts(
-
attempts.getCurrentAttempts()));
+subtaskMetricStore -> {
+
subtaskMetricStore.retainAttempts(

Review Comment:
   Since the metrics for subtasks also exist in the taskMetricsStore in the 
form of taskInfo.subtaskIndex + "." + name, it is also necessary to clean up 
the transient metrics stored in the taskMetricsStore. Otherwise, it could lead 
to inconsistent behaviors that may confuse users, as depicted in the screenshot 
below.
   
![image](https://github.com/apache/flink/assets/107924572/ac8de662-7129-4445-a793-f0facca352e8)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-33565) The concurrentExceptions doesn't work

2023-12-17 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798003#comment-17798003
 ] 

Rui Fan edited comment on FLINK-33565 at 12/18/23 4:21 AM:
---

Hi [~mapohl], I have finished the POC, and wanna to check whether my solution 
is fine for you.

POC can be found in this [PR|https://github.com/apache/flink/pull/23867], this 
PR will be updated in the future, and I have backuped a permanent branch here: 
[https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/]

Core idea is: For each failure, RestartStrategy will return whether current 
failure is a new attempt(Based on FLIP-364).
 * If it's a new attempt, it's root exception, and it will be the latest root 
exception.
 * If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

Core changes:

My poc branch has 3 commits related to this JIRA:
 # FLINK-33565[Exception] Archive exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting
 # FLINK-33565[Exception] Restart strategy checks whether current failure is a 
new attempt
 # FLINK-33565[Scheduler] ConcurrentExceptions works with exception merging

The first commit is refactoring, actually, I don't know why archiving exception 
when restarting task instead of immediately. It means, when one task failure, 
we can see the exception history after flink restart this task. So the first 
commit is only a refactoring. It archives exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting.

The second commit is related to restart strategy, adding a return value 
indicates whether current failure is a new attempt.

The third commit is core solution of this JIRA:
 * If it's a new attempt, it's root exception. and it will be the latest root 
exception.
 * If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

The last commit, I added a job demo with 6 regions, all tasks will fail when 
processing the first record, this demo job can be run directly. Here is the 
result, we can see all failed tasks in the WebUI.

If you agree with my solution, I can go ahead. If not, we can discuss first. 
Looking forward to your opinions, thanks~

!screenshot-1.png|width=923,height=468!


was (Author: fanrui):
Hi [~mapohl], I have finished the POC, and wanna to check whether my solution 
is fine for you.

POC can be found in this [PR|https://github.com/apache/flink/pull/23867],  this 
PR will be updated in the future, and I have backup a permanent branch here: 
https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/

Core idea is: For each failure, RestartStrategy will return whether current 
failure is a new attempt(Based on FLIP-364).
* If it's a new attempt, it's root exception
* If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

Core changes:

My poc branch has 3 commits related to this JIRA:

#  [FLINK-33565][Exception] Archive exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting
# [FLINK-33565][Exception] Restart strategy checks whether current failure is a 
new attempt
# [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging

The first commit is refactoring, actually, I don't know why archiving exception 
when restarting task instead of immediately. It means, when one task failure, 
we can see the exception history after flink restart this task. So the first 
commit is only a refactoring. It archives exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting.

The second commit is related to restart strategy, adding a return value 
indicates whether current failure is a new attempt.

The third commit is core solution of this JIRA:
* If it's a new attempt, it's root exception
* If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

The last commit, I added a job demo with 6 regions, all tasks will fail when 
processing the first record, this demo job can be run directly. Here is the 
result, we can see all failed tasks in the WebUI. 

If you agree with my solution, I can go ahead. If not, we can discuss first. 
Looking forward to your opinions, thanks~

 !screenshot-1.png! 

> The concurrentExceptions doesn't work
> -
>
> Key: FLINK-33565
> URL: https://issues.apache.org/jira/browse/FLINK-33565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.19.0
>
>

[jira] [Commented] (FLINK-33565) The concurrentExceptions doesn't work

2023-12-17 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798003#comment-17798003
 ] 

Rui Fan commented on FLINK-33565:
-

Hi [~mapohl], I have finished the POC, and wanna to check whether my solution 
is fine for you.

POC can be found in this [PR|https://github.com/apache/flink/pull/23867],  this 
PR will be updated in the future, and I have backup a permanent branch here: 
https://github.com/1996fanrui/flink/commits/33565/permanent-backup1/

Core idea is: For each failure, RestartStrategy will return whether current 
failure is a new attempt(Based on FLIP-364).
* If it's a new attempt, it's root exception
* If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

Core changes:

My poc branch has 3 commits related to this JIRA:

#  [FLINK-33565][Exception] Archive exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting
# [FLINK-33565][Exception] Restart strategy checks whether current failure is a 
new attempt
# [FLINK-33565][Scheduler] ConcurrentExceptions works with exception merging

The first commit is refactoring, actually, I don't know why archiving exception 
when restarting task instead of immediately. It means, when one task failure, 
we can see the exception history after flink restart this task. So the first 
commit is only a refactoring. It archives exceptions into the exception history 
immediately when they occur, instead of archiving them when restarting.

The second commit is related to restart strategy, adding a return value 
indicates whether current failure is a new attempt.

The third commit is core solution of this JIRA:
* If it's a new attempt, it's root exception
* If it's not a new attempt, it will be a concurrentException and it will be 
added to the latest RootException.

The last commit, I added a job demo with 6 regions, all tasks will fail when 
processing the first record, this demo job can be run directly. Here is the 
result, we can see all failed tasks in the WebUI. 

If you agree with my solution, I can go ahead. If not, we can discuss first. 
Looking forward to your opinions, thanks~

 !screenshot-1.png! 

> The concurrentExceptions doesn't work
> -
>
> Key: FLINK-33565
> URL: https://issues.apache.org/jira/browse/FLINK-33565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> First of all, thanks to [~mapohl] for helping double-check in advance that 
> this was indeed a bug .
> Displaying exception history in WebUI is supported in FLINK-6042.
> h1. What's the concurrentExceptions?
> When an execution fails due to an exception, other executions in the same 
> region will also restart, and the first Exception is rootException. If other 
> restarted executions also report Exception at this time, we hope to collect 
> these exceptions and Displayed to the user as concurrentExceptions.
> h2. What's this bug?
> The concurrentExceptions is always empty in production, even if other 
> executions report exception at very close times.
> h1. Why doesn't it work?
> If one job has all-to-all shuffle, this job only has one region, and this 
> region has a lot of executions. If one execution throw exception:
>  * JobMaster will mark the state as FAILED for this execution.
>  * The rest of executions of this region will be marked to CANCELING.
>  ** This call stack can be found at FLIP-364 
> [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover]
>  
> When these executions throw exception as well, it JobMaster will mark the 
> state from CANCELING to CANCELED instead of FAILED.
> The CANCELED execution won't call FAILED logic, so their exceptions are 
> ignored.
> Note: all reports are executed inside of JobMaster RPC thread, it's single 
> thread. So these reports are executed serially. So only one execution is 
> marked to FAILED, and the rest of executions will be marked to CANCELED later.
> h1. How to fix it?
> Offline discuss with [~mapohl] , we need to discuss with community should we 
> keep the concurrentExceptions first.
>  * If no, we can remove related logic directly
>  * If yew, we discuss how to fix it later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33565) The concurrentExceptions doesn't work

2023-12-17 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33565:

Attachment: screenshot-1.png

> The concurrentExceptions doesn't work
> -
>
> Key: FLINK-33565
> URL: https://issues.apache.org/jira/browse/FLINK-33565
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Fix For: 1.19.0
>
> Attachments: screenshot-1.png
>
>
> First of all, thanks to [~mapohl] for helping double-check in advance that 
> this was indeed a bug .
> Displaying exception history in WebUI is supported in FLINK-6042.
> h1. What's the concurrentExceptions?
> When an execution fails due to an exception, other executions in the same 
> region will also restart, and the first Exception is rootException. If other 
> restarted executions also report Exception at this time, we hope to collect 
> these exceptions and Displayed to the user as concurrentExceptions.
> h2. What's this bug?
> The concurrentExceptions is always empty in production, even if other 
> executions report exception at very close times.
> h1. Why doesn't it work?
> If one job has all-to-all shuffle, this job only has one region, and this 
> region has a lot of executions. If one execution throw exception:
>  * JobMaster will mark the state as FAILED for this execution.
>  * The rest of executions of this region will be marked to CANCELING.
>  ** This call stack can be found at FLIP-364 
> [part-4.2.3|https://cwiki.apache.org/confluence/display/FLINK/FLIP-364%3A+Improve+the+restart-strategy#FLIP364:Improvetherestartstrategy-4.2.3Detailedcodeforfull-failover]
>  
> When these executions throw exception as well, it JobMaster will mark the 
> state from CANCELING to CANCELED instead of FAILED.
> The CANCELED execution won't call FAILED logic, so their exceptions are 
> ignored.
> Note: all reports are executed inside of JobMaster RPC thread, it's single 
> thread. So these reports are executed serially. So only one execution is 
> marked to FAILED, and the rest of executions will be marked to CANCELED later.
> h1. How to fix it?
> Offline discuss with [~mapohl] , we need to discuss with community should we 
> keep the concurrentExceptions first.
>  * If no, we can remove related logic directly
>  * If yew, we discuss how to fix it later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-25509) FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-12-17 Thread Hang Ruan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798000#comment-17798000
 ] 

Hang Ruan commented on FLINK-25509:
---

Hi, [~martijnvisser] & [~lindong] .

The Flink 1.18 has been released and I think we could push this feature now. 

Which kafka connector version should we put it into? This feature relies on the 
interface in 1.18 and is not compatible in 1.17 or earlier.

> FLIP-208: Add RecordEvaluator to dynamically stop source based on 
> de-serialized records
> ---
>
> Key: FLINK-25509
> URL: https://issues.apache.org/jira/browse/FLINK-25509
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Common, Connectors / Kafka
>Reporter: Dong Lin
>Assignee: Hang Ruan
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> This feature is needed to migrate applications which uses 
> KafkaDeserializationSchema::isEndOfStream() from using FlinkKafkaConsumer to 
> using KafkaSource.
> Please checkout 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
>  for the motivation and the proposed changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33795] Add a new config to forbid autoscale execution in the configured excluded periods [flink-kubernetes-operator]

2023-12-17 Thread via GitHub


flashJd commented on code in PR #728:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1429453681


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java:
##
@@ -94,4 +99,106 @@ public static boolean excludeVerticesFromScaling(
 conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
 return anyAdded;
 }
+
+/** Quartz doesn't have the invertTimeRange flag so rewrite this method. */
+static boolean isTimeIncluded(CronCalendar cron, long timeInMillis) {
+if (cron.getBaseCalendar() != null
+&& !cron.getBaseCalendar().isTimeIncluded(timeInMillis)) {
+return false;
+} else {
+return cron.getCronExpression().isSatisfiedBy(new 
Date(timeInMillis));
+}
+}
+
+static Optional interpretAsDaily(String subExpression) {
+String[] splits = subExpression.split("-");
+if (splits.length != 2) {
+return Optional.empty();
+}
+try {
+DailyCalendar daily = new DailyCalendar(splits[0], splits[1]);
+daily.setInvertTimeRange(true);
+return Optional.of(daily);
+} catch (Exception e) {
+return Optional.empty();
+}
+}
+
+static Optional interpretAsCron(String subExpression) {
+try {
+return Optional.of(new CronCalendar(subExpression));
+} catch (Exception e) {
+return Optional.empty();

Review Comment:
   Incorrect expression config will first be validated in `DefaultValidator` 
and report exception there, I've added extra tests in `DefaultValidatorTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33795] Add a new config to forbid autoscale execution in the configured excluded periods [flink-kubernetes-operator]

2023-12-17 Thread via GitHub


flashJd commented on code in PR #728:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/728#discussion_r1429452589


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java:
##
@@ -94,4 +99,108 @@ public static boolean excludeVerticesFromScaling(
 conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
 return anyAdded;
 }
+
+/** Quartz doesn't have the invertTimeRange flag so rewrite this method. */
+static boolean isTimeIncluded(CronCalendar cron, long timeInMillis) {

Review Comment:
   Removed to `CalendarUtils` and corresponding tests removed to 
`CalendarUtilsTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-31650][metrics][rest] Remove transient metrics for subtasks in terminal state [flink]

2023-12-17 Thread via GitHub


X-czh commented on PR #23447:
URL: https://github.com/apache/flink/pull/23447#issuecomment-1859486000

   @wanglijie95 Kindly remind~ Could you help take a look when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33868) Add checkpoint metrics: the latency to write the file

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33868:
-

 Summary: Add checkpoint metrics: the latency to write the file
 Key: FLINK-33868
 URL: https://issues.apache.org/jira/browse/FLINK-33868
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33869) Add checkpoint metrics: the latency to close the file

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33869:
-

 Summary: Add checkpoint metrics: the latency to close the file
 Key: FLINK-33869
 URL: https://issues.apache.org/jira/browse/FLINK-33869
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33867) Add checkpoint metrics: the rate of file write

2023-12-17 Thread Jufang He (Jira)
Jufang He created FLINK-33867:
-

 Summary: Add checkpoint metrics: the rate of file write
 Key: FLINK-33867
 URL: https://issues.apache.org/jira/browse/FLINK-33867
 Project: Flink
  Issue Type: Sub-task
Reporter: Jufang He






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33856) Add metrics to monitor the interaction performance between task and external storage system in the process of checkpoint making

2023-12-17 Thread Jufang He (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jufang He updated FLINK-33856:
--
Description: 
When Flink makes a checkpoint, the interaction performance with the external 
file system has a great impact on the overall time-consuming. Therefore, it is 
easy to observe the bottleneck point by adding performance indicators when the 
task interacts with the external file storage system. These include: the rate 
of file write , the latency to write the file, the latency to close the file.

In flink side add the above metrics has the following advantages: convenient 
statistical different task E2E time-consuming; do not need to distinguish the 
type of external storage system, can be unified in the 
FsCheckpointStreamFactory.

  was:
When Flink makes a Checkpoint, the interaction performance with the external 
file system has a great impact on the overall time-consuming of the checkpoint. 
Therefore, it is easy to observe the bottleneck point by adding performance 
indicators when the task interacts with the external file storage system. These 
include: the rate of file write , the latency to write the file, the latency to 
close the file.

In flink side add the above metrics has the following advantages: convenient 
statistical different task E2E time-consuming; do not need to distinguish the 
type of external storage system, can be unified in the 
FsCheckpointStreamFactory.


> Add metrics to monitor the interaction performance between task and external 
> storage system in the process of checkpoint making
> ---
>
> Key: FLINK-33856
> URL: https://issues.apache.org/jira/browse/FLINK-33856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.18.0
>Reporter: Jufang He
>Priority: Major
>
> When Flink makes a checkpoint, the interaction performance with the external 
> file system has a great impact on the overall time-consuming. Therefore, it 
> is easy to observe the bottleneck point by adding performance indicators when 
> the task interacts with the external file storage system. These include: the 
> rate of file write , the latency to write the file, the latency to close the 
> file.
> In flink side add the above metrics has the following advantages: convenient 
> statistical different task E2E time-consuming; do not need to distinguish the 
> type of external storage system, can be unified in the 
> FsCheckpointStreamFactory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-30960] Fix a corner case oom error [flink-connector-jdbc]

2023-12-17 Thread via GitHub


bs352 commented on code in PR #77:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/77#discussion_r1429434411


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java:
##
@@ -153,6 +153,16 @@ public void open(int taskNumber, int numTasks) throws 
IOException {
 if (!closed) {
 try {
 flush();
+} catch (FlushingRuntimeException e) {
+/*
+ * We ignore this 
FlushingRuntimeException, as it is
+ * only thrown when 
flushingException was assigned to,
+ * in a former run of this 
scheduler thread, in the next
+ * catch clause. In that case, we 
already have
+ * flushException cached, waiting 
for the next task
+ * manager thread's flush call 
which would throw a new
+ * FlushingRuntimeException 
causing job failure.

Review Comment:
   I see, the close method calls flush as well. In this case, the scheduler is 
already shutdown before calling flush, so it will never loop.
   
   It won't hurt if we throw the new FlushingRuntimeException here instead, as 
it will propagate up to be handled by TaskManager the same way as a regular 
RuntimeException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]

2023-12-17 Thread via GitHub


Myasuka commented on code in PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429398494


##
src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for list state benchmark testing. */
+public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
+private final String STATE_NAME = "listState";
+private final ListStateDescriptor STATE_DESC =
+configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+private ListState listState;
+private List dummyLists;
+
+public static void main(String[] args) throws RunnerException {
+Options opt =
+new OptionsBuilder()
+.verbosity(VerboseMode.NORMAL)
+.include(".*" + 
TtlListStateBenchmark.class.getCanonicalName() + ".*")
+.build();
+
+new Runner(opt).run();
+}
+
+@Setup
+public void setUp() throws Exception {
+keyedStateBackend = createKeyedStateBackend();
+listState = getListState(keyedStateBackend, STATE_DESC);
+dummyLists = new ArrayList<>(listValueCount);
+for (int i = 0; i < listValueCount; ++i) {
+dummyLists.add(random.nextLong());
+}
+keyIndex = new AtomicInteger();
+}
+
+@Setup(Level.Iteration)
+public void setUpPerIteration() throws Exception {
+for (int i = 0; i < setupKeyCount; ++i) {
+keyedStateBackend.setCurrentKey((long) i);
+listState.add(random.nextLong());
+}
+// make sure only one sst file left, so all get invocation will access 
this single file,
+// to prevent the spike caused by different key distribution in 
multiple sst files,
+// the more access to the older sst file, the lower throughput will be.
+if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {

Review Comment:
   I'm not sure whether such refactor could impact the performance results 
significantly, could you please provide some data if possible? If so, I think 
we can refactor the tests with FLINK-33864.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-17 Thread via GitHub


liyubin117 commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1429393701


##
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##
@@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer(
 }
 
 private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
-ConfigOption lenOption =
+ConfigOption varLenOption =
 key(DataGenConnectorOptionsUtil.FIELDS
 + "."
 + name
 + "."
 + DataGenConnectorOptionsUtil.VAR_LEN)
 .booleanType()
 .defaultValue(false);
-options.getOptional(lenOption)
+options.getOptional(varLenOption)

Review Comment:
   sorry to misunderstand. I invoke this to filter the table that contains ` 
'fields.#.var-len' = 'true' `, then we could use `ifPresent` check if it is 
sufficient to use such option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]

2023-12-17 Thread via GitHub


flinkbot commented on PR #23943:
URL: https://github.com/apache/flink/pull/23943#issuecomment-1859429375

   
   ## CI report:
   
   * ffc32e900cfb448b602f9134d6ea3e247564d722 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-32721] [planner] Support CharType for the MaxAggFunction [flink]

2023-12-17 Thread via GitHub


liuyongvs opened a new pull request, #23943:
URL: https://github.com/apache/flink/pull/23943

   
   
   ## What is the purpose of the change
   
   *Support CharType for the MaxAggFunction.**
   
   
   ## Brief change log
   
   * Support CharType for the MaxAggFunction like  MinAggFunction*
   
   
   ## Verifying this change
   * original unit test *
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33719) Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream

2023-12-17 Thread Jane Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797988#comment-17797988
 ] 

Jane Chan commented on FLINK-33719:
---

[~jackylau] assigned to you.

> Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream
> --
>
> Key: FLINK-33719
> URL: https://issues.apache.org/jira/browse/FLINK-33719
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33719) Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream

2023-12-17 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan reassigned FLINK-33719:
-

Assignee: Jacky Lau

> Cleanup the usage of deprecated StreamTableEnvironment#toRetractStream
> --
>
> Key: FLINK-33719
> URL: https://issues.apache.org/jira/browse/FLINK-33719
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jane Chan
>Assignee: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32721) agg max/min supports char type

2023-12-17 Thread Jacky Lau (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797987#comment-17797987
 ] 

Jacky Lau commented on FLINK-32721:
---

hi [~lsy] sorry for late response, i will modify it

> agg max/min supports char type
> --
>
> Key: FLINK-32721
> URL: https://issues.apache.org/jira/browse/FLINK-32721
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: Jacky Lau
>Assignee: Jacky Lau
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.19.0
>
>
> {code:java}
> // flink
> Flink SQL> CREATE TABLE Orders (
> >     name char(10),
> >     price        DECIMAL(32,2),
> >     buyer        ROW,
> >     order_time   TIMESTAMP(3)
> > ) WITH (
> >   'connector' = 'datagen'
> > );
> [INFO] Execute statement succeed.
> Flink SQL> select max(name) from Orders;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Max aggregate function does not 
> support type: ''CHAR''.
> Please re-check the data type. {code}
> {code:java}
> // mysql
> CREATE TABLE IF NOT EXISTS `docs` (
>   `id` int(6) unsigned NOT NULL,
>   `rev` int(3) unsigned NOT NULL,
>   `content` char(200) NOT NULL,
>   PRIMARY KEY (`id`,`rev`)
> ) DEFAULT CHARSET=utf8;
> INSERT INTO `docs` (`id`, `rev`, `content`) VALUES
>   ('1', '1', 'The earth is flat'),
>   ('2', '1', 'One hundred angels can dance on the head of a pin'),
>   ('1', '2', 'The earth is flat and rests on a bull\'s horn'),
>   ('1', '3', 'The earth is like a ball.');
> select max(content) from docs;
> // result 
> |max(content)|
> The earth is like a ball.{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32993][table] Datagen connector handles length-constrained fields according to the schema definition by default [flink]

2023-12-17 Thread via GitHub


LadyForest commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1429375718


##
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##
@@ -141,15 +143,15 @@ private DataGeneratorContainer createContainer(
 }
 
 private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
-ConfigOption lenOption =
+ConfigOption varLenOption =
 key(DataGenConnectorOptionsUtil.FIELDS
 + "."
 + name
 + "."
 + DataGenConnectorOptionsUtil.VAR_LEN)
 .booleanType()
 .defaultValue(false);
-options.getOptional(lenOption)
+options.getOptional(varLenOption)

Review Comment:
   > the prior name is not clear enough, so I use a new one instead.
   
   I mean L#155
   `filter(option -> option)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2023-12-17 Thread via GitHub


xuyangzhong commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1429367900


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   IIUC, for retract streams, if -U comes before +U, +U might also be 
eliminated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-33866) KafkaSinkBuilder in flink-connector-kafka references DeliveryGuarantee in flink-connector-base

2023-12-17 Thread Kurt Ostfeld (Jira)
Kurt Ostfeld created FLINK-33866:


 Summary: KafkaSinkBuilder in flink-connector-kafka references 
DeliveryGuarantee in flink-connector-base
 Key: FLINK-33866
 URL: https://issues.apache.org/jira/browse/FLINK-33866
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: kafka-3.0.2
Reporter: Kurt Ostfeld


I have a Flink project that has code like:

```
KafkaSink.builder().setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
```
 
This worked with flink-connector-kafka 3.0.1 as well as past versions of Flink.
 
This fails to compile with flink-connector-kafka 3.0.2 because that release 
changed flink-connector-base to a provided dependency so the reference to the 
DeliveryGuarantee class becomes a compiler error.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33764) Incorporate GC / Heap metrics in autoscaler decisions

2023-12-17 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-33764.
--
Resolution: Fixed

merged to main f6adb400e1c87f06faec948379c264eebba71166

> Incorporate GC / Heap metrics in autoscaler decisions
> -
>
> Key: FLINK-33764
> URL: https://issues.apache.org/jira/browse/FLINK-33764
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> The autoscaler currently doesn't use any GC/HEAP metrics as part of the 
> scaling decisions. 
> While the long term goal may be to support vertical scaling (increasing TM 
> sizes) currently this is out of scope for the autoscaler.
> However it is very important to detect cases where the throughput of certain 
> vertices or the entire pipeline is critically affected by long GC pauses. In 
> these cases the current autoscaler logic would wrongly assume a low true 
> processing rate and scale the pipeline too high, ramping up costs and causing 
> further issues.
> Using the improved GC metrics introduced in 
> https://issues.apache.org/jira/browse/FLINK-33318 we should measure the GC 
> pauses and simply block scaling decisions if the pipeline spends too much 
> time garbage collecting and notify the user about the required action to 
> increase memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-17 Thread via GitHub


gyfora merged PR #726:
URL: https://github.com/apache/flink-kubernetes-operator/pull/726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]

2023-12-17 Thread via GitHub


Zakelly commented on code in PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429208690


##
src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for list state benchmark testing. */
+public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
+private final String STATE_NAME = "listState";
+private final ListStateDescriptor STATE_DESC =
+configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+private ListState listState;
+private List dummyLists;
+
+public static void main(String[] args) throws RunnerException {
+Options opt =
+new OptionsBuilder()
+.verbosity(VerboseMode.NORMAL)
+.include(".*" + 
TtlListStateBenchmark.class.getCanonicalName() + ".*")
+.build();
+
+new Runner(opt).run();
+}
+
+@Setup
+public void setUp() throws Exception {
+keyedStateBackend = createKeyedStateBackend();
+listState = getListState(keyedStateBackend, STATE_DESC);
+dummyLists = new ArrayList<>(listValueCount);
+for (int i = 0; i < listValueCount; ++i) {
+dummyLists.add(random.nextLong());
+}
+keyIndex = new AtomicInteger();
+}
+
+@Setup(Level.Iteration)
+public void setUpPerIteration() throws Exception {
+for (int i = 0; i < setupKeyCount; ++i) {
+keyedStateBackend.setCurrentKey((long) i);
+listState.add(random.nextLong());
+}
+// make sure only one sst file left, so all get invocation will access 
this single file,
+// to prevent the spike caused by different key distribution in 
multiple sst files,
+// the more access to the older sst file, the lower throughput will be.
+if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {

Review Comment:
   Actually, in order to prevent small changes from affecting the existing test 
results, I did not use class abstraction purposely. I suggest we introduce a 
refactor to the existing tests along with the 
[FLINK-33864](https://issues.apache.org/jira/browse/FLINK-33864), since it can 
greatly affect test results. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]

2023-12-17 Thread via GitHub


Zakelly commented on code in PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429207983


##
src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java:
##
@@ -0,0 +1,48 @@
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Param;
+
+import java.util.concurrent.TimeUnit;
+
+/** The base class for state tests with ttl. */
+public class TtlStateBenchmarkBase extends StateBenchmarkBase {
+
+/** The expired time of ttl. */
+public enum ExpiredTimeOptions {
+
+/** 5 seconds. */
+Seconds5(5000),

Review Comment:
   Considering that the warm-up will take `10` seconds, `5` seconds is enough 
for the test to enter the key expiration phase, and it is not too short. I will 
give a comparison of the results under different configurations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]

2023-12-17 Thread via GitHub


reta commented on PR #38:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-1859212485

   > this is what I tried to do and this is how it looks like with this PR
   
   Yep. it looks reasonable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-30535] Introduce TTL state based benchmarks [flink-benchmarks]

2023-12-17 Thread via GitHub


Myasuka commented on code in PR #83:
URL: https://github.com/apache/flink-benchmarks/pull/83#discussion_r1429199281


##
src/main/java/org/apache/flink/state/benchmark/ttl/TtlListStateBenchmark.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.applyToAllKeys;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.compactState;
+import static 
org.apache.flink.state.benchmark.StateBackendBenchmarkUtils.getListState;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
+import static 
org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;
+
+/** Implementation for list state benchmark testing. */
+public class TtlListStateBenchmark extends TtlStateBenchmarkBase {
+private final String STATE_NAME = "listState";
+private final ListStateDescriptor STATE_DESC =
+configTtl(new ListStateDescriptor<>(STATE_NAME, Long.class));
+private ListState listState;
+private List dummyLists;
+
+public static void main(String[] args) throws RunnerException {
+Options opt =
+new OptionsBuilder()
+.verbosity(VerboseMode.NORMAL)
+.include(".*" + 
TtlListStateBenchmark.class.getCanonicalName() + ".*")
+.build();
+
+new Runner(opt).run();
+}
+
+@Setup
+public void setUp() throws Exception {
+keyedStateBackend = createKeyedStateBackend();
+listState = getListState(keyedStateBackend, STATE_DESC);
+dummyLists = new ArrayList<>(listValueCount);
+for (int i = 0; i < listValueCount; ++i) {
+dummyLists.add(random.nextLong());
+}
+keyIndex = new AtomicInteger();
+}
+
+@Setup(Level.Iteration)
+public void setUpPerIteration() throws Exception {
+for (int i = 0; i < setupKeyCount; ++i) {
+keyedStateBackend.setCurrentKey((long) i);
+listState.add(random.nextLong());
+}
+// make sure only one sst file left, so all get invocation will access 
this single file,
+// to prevent the spike caused by different key distribution in 
multiple sst files,
+// the more access to the older sst file, the lower throughput will be.
+if (keyedStateBackend instanceof RocksDBKeyedStateBackend) {

Review Comment:
   I noticed that there existed so much duplicated code here, can how reduce 
this code via Generics or more abstract class?



##
src/main/java/org/apache/flink/state/benchmark/ttl/TtlStateBenchmarkBase.java:
##
@@ -0,0 +1,48 @@
+package org.apache.flink.state.benchmark.ttl;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.state.benchmark.StateBenchmarkBase;
+import org.openjdk.jmh.annotations.Param;
+
+import java.util.concurrent.TimeUnit;
+
+/** The base class for state tests with ttl. */
+public class TtlStateBenchmarkBase extends StateBenchmarkBase {
+
+/** The expired time of ttl. */
+public enum ExpiredTimeOptions {
+
+/** 5 seconds. */
+Seconds5(5000),

Review Comment:
   

Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]

2023-12-17 Thread via GitHub


Myasuka commented on code in PR #23820:
URL: https://github.com/apache/flink/pull/23820#discussion_r1429195464


##
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+private static final Configuration configs = new Configuration();
+private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+private static final String resourceID = "TestJobManager";
+private static final long profilingDuration = 3L;
+private static final int historySizeLimit = 3;
+
+private ProfilingService profilingService;
+
+@BeforeAll
+static void beforeAll() {
+configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+}
+
+@BeforeEach
+void setUp(@TempDir Path tempDir) {
+configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+profilingService = ProfilingService.getInstance(configs);
+verifyConfigsWorks(profilingService, tempDir);

Review Comment:
   The `setUp` method run `@BeforeEach` should not include any test behavior. 
If you want to add a test to verify the configs, please add a separate unit 
test to verify it.
   
   For the `tmpeDir` problem, you can introduce `@TempDir Path tempDir` as a 
private field in the `ProfilingServiceTest` class to avoid such a varible in 
the `setUp` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33433][rest] Introduce async-profiler to support profiling Job… [flink]

2023-12-17 Thread via GitHub


Myasuka commented on code in PR #23820:
URL: https://github.com/apache/flink/pull/23820#discussion_r1429195243


##
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+private static final Configuration configs = new Configuration();
+private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+private static final String resourceID = "TestJobManager";
+private static final long profilingDuration = 3L;
+private static final int historySizeLimit = 3;
+
+private ProfilingService profilingService;
+
+@BeforeAll
+static void beforeAll() {
+configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+}
+
+@BeforeEach
+void setUp(@TempDir Path tempDir) {
+configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+profilingService = ProfilingService.getInstance(configs);
+verifyConfigsWorks(profilingService, tempDir);
+}
+
+@AfterEach
+void tearDown() throws IOException {
+profilingService.close();
+}
+
+@Test
+public void testSingleInstance() throws IOException {
+ProfilingService instance = ProfilingService.getInstance(configs);
+Assertions.assertEquals(profilingService, instance);
+instance.close();
+}
+
+@Test
+void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+ProfilingInfo profilingInfo =
+profilingService
+.requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+.get();
+Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+try {
+profilingService
+.requestProfiling(
+resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+.get();
+Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+} catch (Exception e) {
+Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+}
+}
+
+@Test
+@Timeout(value = 1, unit = TimeUnit.MINUTES)
+public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+ProfilingInfo profilingInfo =
+profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+LOG.warn(
+"Ignoring failed profiling instance in {} mode, which 
caused by no permission.",
+profilingInfo.getProfilingMode());
+continue;
+}
+Assertions.assertEquals(
+ProfilingInfo.ProfilingStatus.RUNNING,
+profilingInfo.getStatus(),
+String.format(
+"Submitting profiling request should be succeed or 
no permission, but got 

[jira] [Updated] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration

2023-12-17 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33865:

Description: 
exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
Job Configuration.

h2. Reason: 

when exponential-delay.attempts-before-reset-backoff is set by job 
Configuration instead of cluster configuration.

ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
create the ExponentialDelayRestartStrategyConfiguration. And then 
RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
ExponentialDelayRestartStrategyConfiguration.

Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
so ExponentialDelayRestartStrategyConfiguration doesn't support 
exponential-delay.attempts-before-reset-backoff. So if we set 
exponential-delay.attempts-before-reset-backoff  at job level, it won't be 
supported.

h2. Solution

If we use the ExponentialDelayRestartStrategyConfiguration to save 
restartStrategy related options in the ExecutionConfig, all new options are set 
at job level will be missed. 
So we can use the Configuration to save the restartStrategy options inside of 
ExecutionConfig.


 !image-2023-12-17-17-56-59-138.png! 

  was:
exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
Job Configuration.

Reason: when exponential-delay.attempts-before-reset-backoff is set by job 
Configuration instead of cluster configuration.

ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
create the RestartStrategyConfiguration. And then 
RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
RestartStrategyConfiguration.

Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
so it doesn't support exponential-delay.attempts-before-reset-backoff.

I have a misunderstand during FLINK-32895, I thought the 
RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory 
will create ExponentialDelayRestartBackoffTimeStrategyFactory by the 
clusterConfiguration.


 !image-2023-12-17-17-56-59-138.png! 


> exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
> Job Configuration
> ---
>
> Key: FLINK-33865
> URL: https://issues.apache.org/jira/browse/FLINK-33865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-12-17-17-56-59-138.png
>
>
> exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
> Job Configuration.
> h2. Reason: 
> when exponential-delay.attempts-before-reset-backoff is set by job 
> Configuration instead of cluster configuration.
> ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
> create the ExponentialDelayRestartStrategyConfiguration. And then 
> RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
> create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
> ExponentialDelayRestartStrategyConfiguration.
> Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
> so ExponentialDelayRestartStrategyConfiguration doesn't support 
> exponential-delay.attempts-before-reset-backoff. So if we set 
> exponential-delay.attempts-before-reset-backoff  at job level, it won't be 
> supported.
> h2. Solution
> If we use the ExponentialDelayRestartStrategyConfiguration to save 
> restartStrategy related options in the ExecutionConfig, all new options are 
> set at job level will be missed. 
> So we can use the Configuration to save the restartStrategy options inside of 
> ExecutionConfig.
>  !image-2023-12-17-17-56-59-138.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]

2023-12-17 Thread via GitHub


flinkbot commented on PR #23942:
URL: https://github.com/apache/flink/pull/23942#issuecomment-1859194965

   
   ## CI report:
   
   * fce08fa4150978ab8c7ea743ac17cacc6d565d26 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration

2023-12-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33865:
---
Labels: pull-request-available  (was: )

> exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
> Job Configuration
> ---
>
> Key: FLINK-33865
> URL: https://issues.apache.org/jira/browse/FLINK-33865
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-12-17-17-56-59-138.png
>
>
> exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
> Job Configuration.
> Reason: when exponential-delay.attempts-before-reset-backoff is set by job 
> Configuration instead of cluster configuration.
> ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
> create the RestartStrategyConfiguration. And then 
> RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
> create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
> RestartStrategyConfiguration.
> Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
> so it doesn't support exponential-delay.attempts-before-reset-backoff.
> I have a misunderstand during FLINK-32895, I thought the 
> RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory
>  will create ExponentialDelayRestartBackoffTimeStrategyFactory by the 
> clusterConfiguration.
>  !image-2023-12-17-17-56-59-138.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-33865][runtime] Support setting `exponential-delay.attempts-before-reset-backoff` when it's set in Job Configuration [flink]

2023-12-17 Thread via GitHub


1996fanrui opened a new pull request, #23942:
URL: https://github.com/apache/flink/pull/23942

   Draft PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33859] Support OpenSearch v2 [flink-connector-opensearch]

2023-12-17 Thread via GitHub


snuyanzin commented on PR #38:
URL: 
https://github.com/apache/flink-connector-opensearch/pull/38#issuecomment-1859140916

   at least we could build/test with what it is allowed 
   this is what I tried to do and this is how it looks like with this PR
   |  Opensearch|  jdk8 | jdk11  | jdk17 (Flink1.18+)   | jdk21 (Flink 1.19+)  |
   |---|---|---|---|---|
   | v1  | ✅   |✅   | ✅   |✅   |
   | v2  | no since OS v2 baseline is jdk11  | ✅  |✅   |✅   |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-33862) Flink Unit Test Failures on 1.18.0

2023-12-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17797908#comment-17797908
 ] 

Martijn Visser commented on FLINK-33862:


This isn't a command that you would use to build Flink normally.

> Flink Unit Test Failures on 1.18.0
> --
>
> Key: FLINK-33862
> URL: https://issues.apache.org/jira/browse/FLINK-33862
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.18.0, 1.19.0
>Reporter: Prabhu Joseph
>Priority: Major
>
> Flink Unit Test Failures on 1.18.0. There are 100+ unit test cases failing 
> due to below common issues.
> *Issue 1*
> {code:java}
> ./mvnw -DfailIfNoTests=false -Dmaven.test.failure.ignore=true 
> -Dtest=ExecutionPlanAfterExecutionTest test
> [INFO] Running org.apache.flink.client.program.ExecutionPlanAfterExecutionTest
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>   at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>   at 
> org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>   at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>   at 
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1267)
>   at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
>   at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>   at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
>   at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>   at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>   at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>   at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>   at 
> org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$1.onComplete(ScalaFutureUtils.java:47)
>   at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:310)
>   at org.apache.pekko.dispatch.OnComplete.internal(Future.scala:307)
>   at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:234)
>   at org.apache.pekko.dispatch.japi$CallbackBridge.apply(Future.scala:231)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>   at 
> org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils$DirectExecutionContext.execute(ScalaFutureUtils.java:65)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
>   at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
>   at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:629)
>   at 
> org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:34)
>   at 
> org.apache.pekko.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:33)
>   at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:536)
>   at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
>   at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>   at 
> org.apache.pekko.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:73)
>   at 
> org.apache.pekko.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:110)
>   at 
> 

[jira] [Created] (FLINK-33865) exponential-delay.attempts-before-reset-backoff doesn't work when it's set in Job Configuration

2023-12-17 Thread Rui Fan (Jira)
Rui Fan created FLINK-33865:
---

 Summary: exponential-delay.attempts-before-reset-backoff doesn't 
work when it's set in Job Configuration
 Key: FLINK-33865
 URL: https://issues.apache.org/jira/browse/FLINK-33865
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Reporter: Rui Fan
Assignee: Rui Fan
 Attachments: image-2023-12-17-17-56-59-138.png

exponential-delay.attempts-before-reset-backoff doesn't work when it's set in 
Job Configuration.

Reason: when exponential-delay.attempts-before-reset-backoff is set by job 
Configuration instead of cluster configuration.

ExecutionConfig#configure will call RestartStrategies#parseConfiguration to 
create the RestartStrategyConfiguration. And then 
RestartBackoffTimeStrategyFactoryLoader#getJobRestartStrategyFactory will 
create the ExponentialDelayRestartBackoffTimeStrategyFactory by the 
RestartStrategyConfiguration.

Since 1.19, RestartStrategies and RestartStrategyConfiguration are depreated, 
so it doesn't support exponential-delay.attempts-before-reset-backoff.

I have a misunderstand during FLINK-32895, I thought the 
RestartBackoffTimeStrategyFactoryLoader#createRestartBackoffTimeStrategyFactory 
will create ExponentialDelayRestartBackoffTimeStrategyFactory by the 
clusterConfiguration.


 !image-2023-12-17-17-56-59-138.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Update content.zh/docs/dev/table/overview.md [flink]

2023-12-17 Thread via GitHub


flinkbot commented on PR #23941:
URL: https://github.com/apache/flink/pull/23941#issuecomment-1859082438

   
   ## CI report:
   
   * 797c7d77b1b08af37e5e8ac3609cfee16f61cf2a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update content.zh/docs/dev/table/overview.md [flink]

2023-12-17 Thread via GitHub


shalk opened a new pull request, #23941:
URL: https://github.com/apache/flink/pull/23941

   translate content.zh/docs/dev/table/overview.md
   
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org