[jira] [Updated] (FLINK-32881) Client supports making savepoints in detach mode

2023-08-17 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu updated FLINK-32881:
-
Component/s: Runtime / Checkpointing
 (was: API / State Processor)

> Client supports making savepoints in detach mode
> 
>
> Key: FLINK-32881
> URL: https://issues.apache.org/jira/browse/FLINK-32881
> Project: Flink
>  Issue Type: Improvement
>  Components: Client / Job Submission, Runtime / Checkpointing
>Affects Versions: 1.19.0
>Reporter: Renxiang Zhou
>Priority: Major
>  Labels: detach-savepoint
> Fix For: 1.19.0
>
> Attachments: image-2023-08-16-17-14-34-740.png, 
> image-2023-08-16-17-14-44-212.png
>
>
> When triggering a savepoint using the command-line tool, the client needs to 
> wait for the job to finish creating the savepoint before it can exit. For 
> jobs with large state, the savepoint creation process can be time-consuming, 
> leading to the following problems:
>  # Platform users may need to manage thousands of Flink tasks on a single 
> client machine. With the current savepoint triggering mode, all savepoint 
> creation threads on that machine have to wait for the job to finish the 
> snapshot, resulting in significant resource waste;
>  # If the savepoint producing time exceeds the client's timeout duration, the 
> client will throw a timeout exception and report that the triggering 
> savepoint process fails. Since different jobs have varying savepoint 
> durations, it is difficult to adjust the timeout parameter on the client side.
> Therefore, we propose adding a detach mode to trigger savepoints on the 
> client side, just similar to the detach mode behavior when submitting jobs. 
> Here are some specific details:
>  # The savepoint UUID will be generated on the client side. After 
> successfully triggering the savepoint, the client immediately returns the 
> UUID information and exits.
>  # Add a "dump-pending-savepoints" API that allows the client to check 
> whether the triggered savepoint has been successfully created.
> By implementing these changes, the client can detach from the savepoint 
> creation process, reducing resource waste, and providing a way to check the 
> status of savepoint creation.
> !image-2023-08-16-17-14-34-740.png|width=2129,height=625!!image-2023-08-16-17-14-44-212.png|width=1530,height=445!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32881) Client supports making savepoints in detach mode

2023-08-17 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755391#comment-17755391
 ] 

Hangxiang Yu commented on FLINK-32881:
--

Hi, [~zhourenxiang] 
We often use asyc thread to trigger savepoint by REST API to resolve this.

Of course I think It's still useful for users to trigger savepoint by CLI.
Would you like to contribute your pr ?

> Client supports making savepoints in detach mode
> 
>
> Key: FLINK-32881
> URL: https://issues.apache.org/jira/browse/FLINK-32881
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor, Client / Job Submission
>Affects Versions: 1.19.0
>Reporter: Renxiang Zhou
>Priority: Major
>  Labels: detach-savepoint
> Fix For: 1.19.0
>
> Attachments: image-2023-08-16-17-14-34-740.png, 
> image-2023-08-16-17-14-44-212.png
>
>
> When triggering a savepoint using the command-line tool, the client needs to 
> wait for the job to finish creating the savepoint before it can exit. For 
> jobs with large state, the savepoint creation process can be time-consuming, 
> leading to the following problems:
>  # Platform users may need to manage thousands of Flink tasks on a single 
> client machine. With the current savepoint triggering mode, all savepoint 
> creation threads on that machine have to wait for the job to finish the 
> snapshot, resulting in significant resource waste;
>  # If the savepoint producing time exceeds the client's timeout duration, the 
> client will throw a timeout exception and report that the triggering 
> savepoint process fails. Since different jobs have varying savepoint 
> durations, it is difficult to adjust the timeout parameter on the client side.
> Therefore, we propose adding a detach mode to trigger savepoints on the 
> client side, just similar to the detach mode behavior when submitting jobs. 
> Here are some specific details:
>  # The savepoint UUID will be generated on the client side. After 
> successfully triggering the savepoint, the client immediately returns the 
> UUID information and exits.
>  # Add a "dump-pending-savepoints" API that allows the client to check 
> whether the triggered savepoint has been successfully created.
> By implementing these changes, the client can detach from the savepoint 
> creation process, reducing resource waste, and providing a way to check the 
> status of savepoint creation.
> !image-2023-08-16-17-14-34-740.png|width=2129,height=625!!image-2023-08-16-17-14-44-212.png|width=1530,height=445!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement

2023-08-17 Thread via GitHub


1996fanrui commented on code in PR #652:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/652#discussion_r1296769538


##
docs/content/docs/custom-resource/autoscaler.md:
##
@@ -35,26 +35,78 @@ Key benefits to the user:
  - Automatic adaptation to changing load patterns
  - Detailed utilization metrics for performance debugging
 
-Job requirements:
- - The autoscaler currently only works with the latest [Flink 
1.17](https://hub.docker.com/_/flink) or after backporting the following fixes 
to your 1.15/1.16 Flink image
-   - Job vertex parallelism overrides (must have)
- - [Add option to override job vertex parallelisms during job 
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
- - [Change ForwardPartitioner to RebalancePartitioner on parallelism 
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
- - [Fix logic for determining downstream subtasks for partitioner 
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
-   - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
- - Source scaling only supports modern sources which
-   - use the new [Source 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 that exposes the busy time metric (must have, most common connectors already 
do)
-   - expose the [standardized connector 
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 for accessing backlog information (good to have, extra capacity will be added 
for catching up with backlog)
+## Overview
 
-In the current state the autoscaler works best with Kafka sources, as they 
expose all the standardized metrics. It also comes with some additional 
benefits when using Kafka such as automatically detecting and limiting source 
max parallelism to the number of Kafka partitions.
+The autoscaler relies on the metrics exposed by the Flink metric system for 
the individual tasks. The metrics are queried directly from the Flink job.
 
-{{< hint info >}}
-The autoscaler also supports a passive/metrics-only mode where it only 
collects and evaluates scaling related performance metrics but does not trigger 
any job upgrades.
-This can be used to gain confidence in the module without any impact on the 
running applications.
+Collected metrics:
+ - Backlog information at each source
+ - Incoming data rate at the sources (e.g. records/sec written into the Kafka 
topic)
+ - Number of records processed per second in each job vertex
+ - Busy time per second of each job vertex (current utilization)
 
-To disable scaling actions, set: 
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< hint info >}}
+Please note that we are not using any container memory / CPU utilization 
metrics directly here. High utilization will be reflected in the processing 
rate and busy time metrics of the individual job vertexes.
 {{< /hint >}}
 
+The algorithm starts from the sources and recursively computes the required 
processing capacity (target data rate) for each operator in the pipeline. At 
the source vertices, target data rate is equal to incoming data rate (from the 
Kafka topic).
+
+For downstream operators we compute the target data rate as the sum of the 
input (upstream) operators output data rate along the given edge in the 
processing graph.
+
+{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target 
Data Rates" >}}
+
+Users configure the target utilization percentage of the operators in the 
pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler 
then finds a parallelism configuration such that the output rates of all 
operators match the input rates of all their downstream operators at the 
targeted utilization.
+
+In this example we see an upscale operation:
+
+{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}}
+
+Similarly as load decreases, the autoscaler adjusts individual operator 
parallelism levels to match the current rate over time.
+
+{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}}
+
+The autoscaler approach is based on [Three steps is all you need: fast, 
accurate, automatic scaling decisions for distributed streaming 
dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri 
et al.
+
+## Executing rescaling operations
+
+By default the autoscaler uses the built in job upgrade mechanism from the 
operator to perform the rescaling as detailed in [Job Management and Stateful 
upgrades]({{< ref "docs/custom-resource/job-management" >}}).
+
+### Flink 1.18 and in-place scaling support
+
+The upcoming Flink 1.18 release brings very significant improvements to the 
speed of scaling operations through the new resource requirements rest endpoint.
+This allows the autos

[jira] [Assigned] (FLINK-32886) Issue with volumeMounts when creating OLM for Flink Operator 1.6.0

2023-08-17 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-32886:
--

Assignee: James Busche

> Issue with volumeMounts when creating OLM for Flink Operator 1.6.0
> --
>
> Key: FLINK-32886
> URL: https://issues.apache.org/jira/browse/FLINK-32886
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: James Busche
>Assignee: James Busche
>Priority: Minor
>
> I notice a volumemount problem when trying to deploy the OLM CSV for the 
> 1.6.0 Flink Kubernetes Operator.  (Following the directions from [OLM 
> Verification of a Flink Kubernetes Operator 
> Release|https://cwiki.apache.org/confluence/display/FLINK/OLM+Verification+of+a+Flink+Kubernetes+Operator+Release]]
>  
> ^{{oc describe csv}}^
> ^{{...}}^
> ^{{Warning  InstallComponentFailed  46s (x7 over 49s)  
> operator-lifecycle-manager  install strategy failed: Deployment.apps 
> "flink-kubernetes-operator" is invalid: [spec.template.spec.volumes[2].name: 
> Duplicate value: "keystore", 
> spec.template.spec.containers[0].volumeMounts[1].name: Not found: 
> "flink-artifacts-volume"]}}^
>  
> My current workaround is to change [line 
> 88|https://github.com/apache/flink-kubernetes-operator/blob/main/tools/olm/docker-entry.sh#L88]
>  to look like this:
>  
> {{  ^yq ea -i 
> '.spec.install.spec.deployments[0].spec.template.spec.volumes[1] = \{"name": 
> "flink-artifacts-volume","emptyDir": {}}' "${CSV_FILE}"^}}  ^{{yq ea -i 
> '.spec.install.spec.deployments[0].spec.template.spec.volumes[2] = \{"name": 
> "keystore","emptyDir": {}}' "${CSV_FILE}"}}^
>  
> And then the operator deploys without error:
> ^oc get csv                                                                   
>                                                                        NAME   
>                             DISPLAY                     VERSION   REPLACES    
>                        PHASEflink-kubernetes-operator.v1.6.0   Flink 
> Kubernetes Operator   1.6.0     flink-kubernetes-operator.v1.5.0   Succeeded^



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement

2023-08-17 Thread via GitHub


gyfora commented on code in PR #652:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/652#discussion_r1296783335


##
docs/content/docs/custom-resource/autoscaler.md:
##
@@ -35,26 +35,78 @@ Key benefits to the user:
  - Automatic adaptation to changing load patterns
  - Detailed utilization metrics for performance debugging
 
-Job requirements:
- - The autoscaler currently only works with the latest [Flink 
1.17](https://hub.docker.com/_/flink) or after backporting the following fixes 
to your 1.15/1.16 Flink image
-   - Job vertex parallelism overrides (must have)
- - [Add option to override job vertex parallelisms during job 
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
- - [Change ForwardPartitioner to RebalancePartitioner on parallelism 
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
- - [Fix logic for determining downstream subtasks for partitioner 
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
-   - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
- - Source scaling only supports modern sources which
-   - use the new [Source 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 that exposes the busy time metric (must have, most common connectors already 
do)
-   - expose the [standardized connector 
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 for accessing backlog information (good to have, extra capacity will be added 
for catching up with backlog)
+## Overview
 
-In the current state the autoscaler works best with Kafka sources, as they 
expose all the standardized metrics. It also comes with some additional 
benefits when using Kafka such as automatically detecting and limiting source 
max parallelism to the number of Kafka partitions.
+The autoscaler relies on the metrics exposed by the Flink metric system for 
the individual tasks. The metrics are queried directly from the Flink job.
 
-{{< hint info >}}
-The autoscaler also supports a passive/metrics-only mode where it only 
collects and evaluates scaling related performance metrics but does not trigger 
any job upgrades.
-This can be used to gain confidence in the module without any impact on the 
running applications.
+Collected metrics:
+ - Backlog information at each source
+ - Incoming data rate at the sources (e.g. records/sec written into the Kafka 
topic)
+ - Number of records processed per second in each job vertex
+ - Busy time per second of each job vertex (current utilization)
 
-To disable scaling actions, set: 
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< hint info >}}
+Please note that we are not using any container memory / CPU utilization 
metrics directly here. High utilization will be reflected in the processing 
rate and busy time metrics of the individual job vertexes.
 {{< /hint >}}
 
+The algorithm starts from the sources and recursively computes the required 
processing capacity (target data rate) for each operator in the pipeline. At 
the source vertices, target data rate is equal to incoming data rate (from the 
Kafka topic).
+
+For downstream operators we compute the target data rate as the sum of the 
input (upstream) operators output data rate along the given edge in the 
processing graph.
+
+{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target 
Data Rates" >}}
+
+Users configure the target utilization percentage of the operators in the 
pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler 
then finds a parallelism configuration such that the output rates of all 
operators match the input rates of all their downstream operators at the 
targeted utilization.
+
+In this example we see an upscale operation:
+
+{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}}
+
+Similarly as load decreases, the autoscaler adjusts individual operator 
parallelism levels to match the current rate over time.
+
+{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}}
+
+The autoscaler approach is based on [Three steps is all you need: fast, 
accurate, automatic scaling decisions for distributed streaming 
dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri 
et al.
+
+## Executing rescaling operations
+
+By default the autoscaler uses the built in job upgrade mechanism from the 
operator to perform the rescaling as detailed in [Job Management and Stateful 
upgrades]({{< ref "docs/custom-resource/job-management" >}}).
+
+### Flink 1.18 and in-place scaling support
+
+The upcoming Flink 1.18 release brings very significant improvements to the 
speed of scaling operations through the new resource requirements rest endpoint.
+This allows the autoscale

[GitHub] [flink-ml] Fanoid commented on a diff in pull request #250: [FLINK-32810] Improve memory allocation in ListStateWithCache

2023-08-17 Thread via GitHub


Fanoid commented on code in PR #250:
URL: https://github.com/apache/flink-ml/pull/250#discussion_r1296795610


##
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java:
##
@@ -62,14 +63,32 @@ public class ListStateWithCache implements ListState {
 /** The data cache writer for the received records. */
 private final DataCacheWriter dataCacheWriter;
 
-@SuppressWarnings("unchecked")
 public ListStateWithCache(
 TypeSerializer serializer,
 StreamTask containingTask,
 StreamingRuntimeContext runtimeContext,
 StateInitializationContext stateInitializationContext,
 OperatorID operatorID)
 throws IOException {
+this(
+serializer,
+containingTask,
+runtimeContext,
+stateInitializationContext,
+operatorID,
+0.);

Review Comment:
   @jiangxin369 and @zhipeng93 , thanks for your comments. 
   
   After offline discussion, I found it is necessary to do check about 
sub-fractions from all usages on the operator-scope managed memory. So I 
changed the constructor of `ListStateWithCache`.
   
   Please check the latest changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] Fanoid commented on pull request #250: [FLINK-32810] Improve memory allocation in ListStateWithCache

2023-08-17 Thread via GitHub


Fanoid commented on PR #250:
URL: https://github.com/apache/flink-ml/pull/250#issuecomment-1681787714

   hi, @jiangxin369 and @zhipeng93, I've updated the PR. Could you have another 
look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #23216: [FLINK-32831][table-planner] RuntimeFilterProgram should aware join type when looking for the build side

2023-08-17 Thread via GitHub


wanglijie95 commented on PR #23216:
URL: https://github.com/apache/flink/pull/23216#issuecomment-1681809261

   Thanks for review @lsyldliu , I 've addressed or replied your comments, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on a diff in pull request #23216: [FLINK-32831][table-planner] RuntimeFilterProgram should aware join type when looking for the build side

2023-08-17 Thread via GitHub


wanglijie95 commented on code in PR #23216:
URL: https://github.com/apache/flink/pull/23216#discussion_r1296819120


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##
@@ -202,6 +202,32 @@ public void testBuildSideIsJoinWithoutExchange() throws 
Exception {
 util.verifyPlan(query);
 }
 
+@Test
+public void testBuildSideIsLeftJoinWithoutExchange() throws Exception {

Review Comment:
   After an offline discussion with @lsyldliu , we think it is meaningful to 
directly inject runtime filter for fact2(let the output of the `LeftOuterJoin` 
as a builder), even if it will affect the creation of `MultipleInput`. We will 
keep it as a future optimization, no changes will be made in this version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration

2023-08-17 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren reassigned FLINK-30234:
-

Assignee: Hongshun Wang  (was: Wencong Liu)

> SourceReaderBase should provide an option to disable numRecordsIn metric 
> registration
> -
>
> Key: FLINK-30234
> URL: https://issues.apache.org/jira/browse/FLINK-30234
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common
>Affects Versions: 1.17.0
>Reporter: Qingsheng Ren
>Assignee: Hongshun Wang
>Priority: Major
>
> Currently the numRecordsIn metric is pre-registered for all sources in 
> SourceReaderBase. Considering different implementation of source reader, the 
> definition of "record" might differ from the one we use in SourceReaderBase, 
> hence numRecordsIn might be inaccurate.
> We could introduce an option in SourceReader to disable the registration of 
> numRecordsIn in SourceReaderBase and let the actual implementation to report 
> the metric instead. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement

2023-08-17 Thread via GitHub


gyfora merged PR #652:
URL: https://github.com/apache/flink-kubernetes-operator/pull/652


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


XComp commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1296771740


##
tools/ci/verify_scala_suffixes.sh:
##
@@ -37,20 +37,33 @@
 #
 # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala
 # dependent modules.
-CI_DIR=$1
-FLINK_ROOT=$2
+
+
+usage() {
+  echo "Usage: $0 "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ScalaSuffixChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+MVN=${MVN:-./mvnw}
 
 echo "--- Flink Scala Dependency Analyzer ---"
 echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'."
 echo "If you haven't built the project, please do so first by running \"mvn 
clean install -DskipTests\""
 
-source "${CI_DIR}/maven-utils.sh"
+dependency_plugin_output=/tmp/dep.txt
 
-cd "$FLINK_ROOT" || exit
-
-dependency_plugin_output=${CI_DIR}/dep.txt
-
-run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} 
>> "${dependency_plugin_output}"
+$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > 
"${dependency_plugin_output}"

Review Comment:
   ```suggestion
   $MVN -T1 dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: 
${MAVEN_ARGUMENTS} > "${dependency_plugin_output}"
   ```



##
tools/ci/verify_scala_suffixes.sh:
##
@@ -37,20 +37,33 @@
 #
 # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala
 # dependent modules.
-CI_DIR=$1
-FLINK_ROOT=$2
+
+
+usage() {
+  echo "Usage: $0 "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ScalaSuffixChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+MVN=${MVN:-./mvnw}
 
 echo "--- Flink Scala Dependency Analyzer ---"
 echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'."
 echo "If you haven't built the project, please do so first by running \"mvn 
clean install -DskipTests\""
 
-source "${CI_DIR}/maven-utils.sh"
+dependency_plugin_output=/tmp/dep.txt

Review Comment:
   ```suggestion
   dependency_plugin_output=/tmp/dependency-tree-including-scala.txt
   ```
   nit: We might want to give it a more meaningful name to differentiate it 
from the other dependency:tree temporary file. ...even if it's only a temporary 
file. That makes things easier to debug issues.



##
tools/ci/verify_scala_suffixes.sh:
##
@@ -37,20 +37,33 @@
 #
 # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala
 # dependent modules.
-CI_DIR=$1
-FLINK_ROOT=$2
+
+
+usage() {
+  echo "Usage: $0 "

Review Comment:
   ```suggestion
 echo "Usage: $0"
   ```
   nit: just to avoid someone coming up with a hotfix PR



##
tools/ci/verify_bundled_optional.sh:
##
@@ -17,24 +17,51 @@
 # limitations under the License.
 #
 
+usage() {
+  echo "Usage: $0 "
+  echo " A file containing the output 
of the Maven build."
+  echo ""
+  echo "mvnw clean package > "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ShadeOptionalChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+if [[ "$#" != "1" ]]; then
+  usage
+  exit 1
+fi
+
 ## Checks that all bundled dependencies are marked as optional in the poms
 MVN_CLEAN_COMPILE_OUT=$1
-CI_DIR=$2
-FLINK_ROOT=$3
 
-source "${CI_DIR}/maven-utils.sh"
+MVN=${MVN:-./mvnw}
 
-cd "$FLINK_ROOT" || exit
+dependency_plugin_output=/tmp/optional_dep.txt

Review Comment:
   ```suggestion
   dependency_plugin_output=/tmp/dependency-tree.txt
   ```
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


XComp commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1296771740


##
tools/ci/verify_scala_suffixes.sh:
##
@@ -37,20 +37,33 @@
 #
 # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala
 # dependent modules.
-CI_DIR=$1
-FLINK_ROOT=$2
+
+
+usage() {
+  echo "Usage: $0 "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ScalaSuffixChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+MVN=${MVN:-./mvnw}
 
 echo "--- Flink Scala Dependency Analyzer ---"
 echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'."
 echo "If you haven't built the project, please do so first by running \"mvn 
clean install -DskipTests\""
 
-source "${CI_DIR}/maven-utils.sh"
+dependency_plugin_output=/tmp/dep.txt
 
-cd "$FLINK_ROOT" || exit
-
-dependency_plugin_output=${CI_DIR}/dep.txt
-
-run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} 
>> "${dependency_plugin_output}"
+$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > 
"${dependency_plugin_output}"

Review Comment:
   ```suggestion
   $MVN -T1 dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: 
${MAVEN_ARGUMENTS} > "${dependency_plugin_output}"
   ```
   Here we have to add `-T1` as well to make sure that the output is actually 
parseable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


XComp commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1296832114


##
tools/ci/verify_bundled_optional.sh:
##
@@ -17,24 +17,51 @@
 # limitations under the License.
 #
 
+usage() {
+  echo "Usage: $0 "
+  echo " A file containing the output 
of the Maven build."
+  echo ""
+  echo "mvnw clean package > "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ShadeOptionalChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+if [[ "$#" != "1" ]]; then
+  usage
+  exit 1
+fi
+
 ## Checks that all bundled dependencies are marked as optional in the poms
 MVN_CLEAN_COMPILE_OUT=$1
-CI_DIR=$2
-FLINK_ROOT=$3
 
-source "${CI_DIR}/maven-utils.sh"
+MVN=${MVN:-./mvnw}
 
-cd "$FLINK_ROOT" || exit
+dependency_plugin_output=/tmp/optional_dep.txt
 
-dependency_plugin_output=${CI_DIR}/optional_dep.txt
+$MVN dependency:tree -B > "${dependency_plugin_output}"

Review Comment:
   I did another pass over the code and identified one other location where we 
should add the `-T1` parameter. I commented on it below in a separate review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #653: Backport recent autoscaler docs changes

2023-08-17 Thread via GitHub


gyfora commented on PR #653:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/653#issuecomment-1681842989

   cc @1996fanrui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #653: Backport recent autoscaler docs changes

2023-08-17 Thread via GitHub


gyfora opened a new pull request, #653:
URL: https://github.com/apache/flink-kubernetes-operator/pull/653

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui opened a new pull request, #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


1996fanrui opened a new pull request, #23228:
URL: https://github.com/apache/flink/pull/23228

   [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the 
FileSystemBlobStore
   
   The `private boolean put(File fromFile, String toBlobPath)` has called 
`createBasePathIfNeeded`, so the `public boolean put(File localFile, JobID 
jobId, BlobKey blobKey)` doesn't need to call `createBasePathIfNeeded`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


1996fanrui commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681845331

   Hi @gaborgsomogyi , would you mind helping take a look this hotfix? thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)

2023-08-17 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755421#comment-17755421
 ] 

Matthias Pohl commented on FLINK-32668:
---

If the test times out, the failure handling should be triggered (see 
[common.sh:957|https://github.com/apache/flink/blob/c8ae39d4ac73f81873e1d8ac37e17c29ae330b23/flink-end-to-end-tests/test-scripts/common.sh#L957])
 to print a meaningful output. Nonetheless, the following kill command will 
cause the actual test run to return with a non-zero exit code. As a 
consequence, the script will fail. So, your scenario should be already covered 
by the existing code.

The {{kill_test_watchdog}} covers a separate scenario: Killing the watchdog 
process if it's still around. Here, we shouldn't cover the corresponding test 
run anymore (and imply whether the test succeeded or not based on whether the 
watchdog process is still around). The error handling, as said in the previous 
paragraph, should be covered in the test execution itself.

Please correct me if I'm missing anything.

> fix up watchdog timeout error msg  in common.sh(e2e test) 
> --
>
> Key: FLINK-32668
> URL: https://issues.apache.org/jira/browse/FLINK-32668
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
> Attachments: image-2023-07-25-15-27-37-441.png
>
>
> When run e2e test, an error like this occrurs:
> !image-2023-07-25-15-27-37-441.png|width=733,height=115!
>  
> The corresponding code:
> {code:java}
> kill_test_watchdog() {
>     local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>     echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>     kill $watchdog_pid
> } 
> internal_run_with_timeout() {
>     local timeout_in_seconds="$1"
>     local on_failure="$2"
>     local command_label="$3"
>     local command="${@:4}"
>     on_exit kill_test_watchdog
>    (
>            command_pid=$BASHPID
>            (sleep "${timeout_in_seconds}" # set a timeout for this command
>             echo "${command_label:-"The command '${command}'"} (pid: 
> $command_pid) did not finish after $timeout_in_seconds seconds."
> eval "${on_failure}"
>            kill "$command_pid") & watchdog_pid=$!
>            echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
>            # invoke
>           $command
>   )
> }{code}
>  
> When {{$command}} completes before the timeout, the watchdog process is 
> killed successfully. However, when {{$command}} times out, the watchdog 
> process kills {{$command}} and then exits itself, leaving behind an error 
> message when trying to kill its own process ID with {{{}kill 
> $watchdog_pid{}}}.This error msg "no such process" is hard to understand.
>  
> So, I will modify like this with better error message:
>  
> {code:java}
> kill_test_watchdog() {
>       local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>       if kill -0 $watchdog_pid > /dev/null 2>&1; then
>            echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>            kill $watchdog_pid
>       else
>             echo "[ERROR] Test is timeout"
>             exit 1       
>   fi
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


flinkbot commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681852112

   
   ## CI report:
   
   * 4724d64fec3af7b4e0b1a5b634fc2f0d19f5f3a0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32875) Enable limiting number of retained jobs for the local job archive directory in flink history server

2023-08-17 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-32875:
--
Fix Version/s: (was: 1.18.0)

> Enable limiting number of retained jobs for the local job archive directory 
> in flink history server
> ---
>
> Key: FLINK-32875
> URL: https://issues.apache.org/jira/browse/FLINK-32875
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.16.1
>Reporter: Allison Chang
>Priority: Not a Priority
>
> Currently the Flink history server limits the number of job archives stored 
> using the history.server.retained-jobs configuration, which sets the limit 
> for both the local cache as well as remote directories. We want to decouple 
> the local cache limit from the remote directory which stores the job archives 
> so that we are not limited by the size of a local filesystem where Flink 
> History server is deployed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32888) File upload runs into EndOfDataDecoderException

2023-08-17 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-32888:


 Summary: File upload runs into EndOfDataDecoderException
 Key: FLINK-32888
 URL: https://issues.apache.org/jira/browse/FLINK-32888
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.17.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0, 1.17.2


With the right request the FIleUploadHandler runs into a 
EndOfDataDecoderException although everything is fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value

2023-08-17 Thread Fan Hong (Jira)
Fan Hong created FLINK-32889:


 Summary: BinaryClassificationEvaluator gives wrong weighted AUC 
value
 Key: FLINK-32889
 URL: https://issues.apache.org/jira/browse/FLINK-32889
 Project: Flink
  Issue Type: Bug
  Components: Library / Machine Learning
Affects Versions: ml-2.3.0
Reporter: Fan Hong


BinaryClassificationEvaluator gives wrong AUC value when a weight column 
provided.

Here is an case from the unit test. The (score, label, weight) of data are:
{code:java}
(0.9, 1.0,  0.8),
(0.9, 1.0,  0.7),
(0.9, 1.0,  0.5),
(0.75, 0.0,  1.2),
(0.6, 0.0,  1.3),
(0.9, 1.0,  1.5),
(0.9, 1.0,  1.4),
(0.4, 0.0,  0.3),
(0.3, 0.0,  0.5),
(0.9, 1.0,  1.9),
(0.2, 0.0,  1.2),
(0.1, 1.0,  1.0)
{code}
PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML 
implementation gives 0.891168.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #653: Backport recent autoscaler docs changes

2023-08-17 Thread via GitHub


gyfora merged PR #653:
URL: https://github.com/apache/flink-kubernetes-operator/pull/653


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor

2023-08-17 Thread via GitHub


XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1296902228


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##
@@ -376,7 +376,7 @@ private void confirmLeadership(
 LOG.debug("Confirm leadership {}.", 
leaderSessionId);
 
leaderElection.confirmLeadership(leaderSessionId, address);
 } else {
-LOG.trace(
+LOG.debug(
 "Ignore confirming leadership 
because the leader {} is no longer valid.",
 leaderSessionId);
 }

Review Comment:
   the entire callback can be replaced by `runIfValidLeader`



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##
@@ -262,34 +262,67 @@ public void grantLeadership(UUID leaderSessionID) {
 @GuardedBy("lock")
 private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
 sequentialOperation =
-sequentialOperation.thenRun(
-() ->
-runIfValidLeader(
-leaderSessionId,
-ThrowingRunnable.unchecked(
+sequentialOperation.thenCompose(
+unused ->
+supplyAsyncIfValidLeader(
+leaderSessionId,
 () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
-"verify job scheduling status and 
create JobMasterServiceProcess"));
-
+
jobResultStore.hasJobResultEntryAsync(
+getJobID()),
+() ->
+
FutureUtils.completedExceptionally(
+new 
LeadershipLostException(
+"The 
leadership is lost.")))
+.handle(
+(hasJobResult, throwable) -> {
+if (throwable
+instanceof 
LeadershipLostException) {
+
printLogIfNotValidLeader(
+"verify job 
result entry",
+
leaderSessionId);
+return null;
+} else if (throwable != 
null) {
+
ExceptionUtils.rethrow(throwable);
+}
+if (hasJobResult) {
+
handleJobAlreadyDoneIfValidLeader(
+
leaderSessionId);
+} else {
+
createNewJobMasterServiceProcessIfValidLeader(
+
leaderSessionId);
+}
+return null;
+}));
 handleAsyncOperationError(sequentialOperation, "Could not start the 
job manager.");
 }
 
-@GuardedBy("lock")
-private void 
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
-throws FlinkException {
-try {
-if (jobResultStore.hasJobResultEntry(getJobID())) {
-jobAlreadyDone(leaderSessionId);
-} else {
-createNewJobMasterServiceProcess(leaderSessionId);
-}
-} catch (IOException e) {
-throw new FlinkException(
-String.format(
-"Could not retrieve the job scheduling status for 
job %s.", getJobID()),
-e);
-}
+private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
+runIfValidLeader(
+

[GitHub] [flink] zentol opened a new pull request, #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException

2023-08-17 Thread via GitHub


zentol opened a new pull request, #23229:
URL: https://github.com/apache/flink/pull/23229

   HttpPostRequestDecoder#hasNext() throws an `EndOfDataDecoderException` when 
called if all attributes were already read. There is apparently an edge-case 
where the a non empty HttpContent is received, but we already retrieved all 
attributes (so I guess it's just multipart metadata).
   
   We now assume that this isn't a problem; if it is the application layer 
should notice it (e.g., if a file/attribute is missing).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32888) File upload runs into EndOfDataDecoderException

2023-08-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32888:
---
Labels: pull-request-available  (was: )

> File upload runs into EndOfDataDecoderException
> ---
>
> Key: FLINK-32888
> URL: https://issues.apache.org/jira/browse/FLINK-32888
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / REST
>Affects Versions: 1.17.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0, 1.17.2
>
>
> With the right request the FIleUploadHandler runs into a 
> EndOfDataDecoderException although everything is fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


gaborgsomogyi commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681926573

   If you can elaborate what this can cause I can understand more of your 
issue. Just for giving some context, there are many usages of the class and all 
of the execution paths must create the directory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException

2023-08-17 Thread via GitHub


flinkbot commented on PR #23229:
URL: https://github.com/apache/flink/pull/23229#issuecomment-1681935539

   
   ## CI report:
   
   * f962bdf1e05e41440f8996dcd0b6511104175371 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


gaborgsomogyi commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681939446

   Now I see what may you mean. The `createBasePathIfNeeded` function called 2 
times in the call chain so we're loosing 2-3 processor ticks because the second 
call must double check a boolean flag.
   If I understand correctly then this is more like a cosmetic change and not a 
hotfix just to double check, right?
   Correct me if I've understood something wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] masteryhx commented on pull request #23123: [FLINK-32523] Fix Timeout and Assert Error for NotifyCheckpointAbortedITCase#testNotifyCheckpointAborted

2023-08-17 Thread via GitHub


masteryhx commented on PR #23123:
URL: https://github.com/apache/flink/pull/23123#issuecomment-1681940016

   @Myasuka  @fredia 
   Could you help to take a look ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32523) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout on AZP

2023-08-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32523:
---
Labels: pull-request-available test-stability  (was: test-stability)

> NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout 
> on AZP
> ---
>
> Key: FLINK-32523
> URL: https://issues.apache.org/jira/browse/FLINK-32523
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.2, 1.18.0
>Reporter: Sergey Nuyanzin
>Assignee: Hangxiang Yu
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Attachments: failure.log
>
>
> This build
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50795&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=8638
>  fails with timeout
> {noformat}
> Jul 03 01:26:35 org.junit.runners.model.TestTimedOutException: test timed out 
> after 10 milliseconds
> Jul 03 01:26:35   at java.lang.Object.wait(Native Method)
> Jul 03 01:26:35   at java.lang.Object.wait(Object.java:502)
> Jul 03 01:26:35   at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:198)
> Jul 03 01:26:35   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:189)
> Jul 03 01:26:35   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Jul 03 01:26:35   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Jul 03 01:26:35   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Jul 03 01:26:35   at java.lang.reflect.Method.invoke(Method.java:498)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Jul 03 01:26:35   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Jul 03 01:26:35   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Jul 03 01:26:35   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Jul 03 01:26:35   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


zentol commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1296932433


##
tools/ci/license_check.sh:
##
@@ -17,6 +17,34 @@
 # limitations under the License.
 

 
+usage() {
+  if [[ "$#" != "2" ]]; then
+echo "Usage: $0  "
+echo " A file containing the 
output of the Maven build."
+echo "  A directory containing a 
Maven repository into which the Flink artifacts were deployed."
+echo ""
+echo "Example preparation:"
+echo "mvnw clean deploy 
-DaltDeploymentRepository=validation_repository::default::file:
 > "
+echo ""
+echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+echo "See further details in the JavaDoc of LicenseChecker."
+  fi
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage

Review Comment:
   doesnt work when used with brackets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


gaborgsomogyi commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681945758

   The change itself looks good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-23636) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails on azure

2023-08-17 Thread Hangxiang Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hangxiang Yu closed FLINK-23636.

Fix Version/s: (was: 1.13.7)
   Resolution: Duplicate

It's duplicated by FLINK-32523
All cases and reasons have been listed in the new ticket, so I closed this.

> NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails on azure
> 
>
> Key: FLINK-23636
> URL: https://issues.apache.org/jira/browse/FLINK-23636
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.13.2
>Reporter: Xintong Song
>Priority: Major
>  Labels: test-stability
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21558&view=logs&j=59c257d0-c525-593b-261d-e96a86f1926b&t=b93980e3-753f-5433-6a19-13747adae66a&l=9527
> {code}
> Aug 05 03:03:53 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 6.985 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase
> Aug 05 03:03:53 [ERROR] 
> testNotifyCheckpointAborted[unalignedCheckpointEnabled 
> =true](org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase)  
> Time elapsed: 4.995 s  <<< FAILURE!
> Aug 05 03:03:53 java.lang.AssertionError: expected:<1> but was:<2>
> Aug 05 03:03:53   at org.junit.Assert.fail(Assert.java:88)
> Aug 05 03:03:53   at org.junit.Assert.failNotEquals(Assert.java:834)
> Aug 05 03:03:53   at org.junit.Assert.assertEquals(Assert.java:645)
> Aug 05 03:03:53   at org.junit.Assert.assertEquals(Assert.java:631)
> Aug 05 03:03:53   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAbortedTimes(NotifyCheckpointAbortedITCase.java:210)
> Aug 05 03:03:53   at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:186)
> Aug 05 03:03:53   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Aug 05 03:03:53   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Aug 05 03:03:53   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Aug 05 03:03:53   at java.lang.reflect.Method.invoke(Method.java:498)
> Aug 05 03:03:53   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> Aug 05 03:03:53   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Aug 05 03:03:53   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> Aug 05 03:03:53   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Aug 05 03:03:53   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> Aug 05 03:03:53   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> Aug 05 03:03:53   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Aug 05 03:03:53   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files

2023-08-17 Thread via GitHub


1996fanrui commented on code in PR #23219:
URL: https://github.com/apache/flink/pull/23219#discussion_r1296881231


##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
 .noDefaultValue()
 .withDeprecatedKeys("yarn.ship-directories")
 .withDescription(
-"A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+"A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
++ "cluster. These files/directories can 
come from the local client and/or remote "

Review Comment:
   ```suggestion
   + "cluster. These files/directories can 
come from the local path of flink client or remote "
   ```



##
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java:
##
@@ -272,17 +272,24 @@ public class YarnConfigOptions {
 .noDefaultValue()
 .withDeprecatedKeys("yarn.ship-directories")
 .withDescription(
-"A semicolon-separated list of files and/or 
directories to be shipped to the YARN cluster.");
+"A semicolon-separated list of files and/or 
directories to be shipped to the YARN "
++ "cluster. These files/directories can 
come from the local client and/or remote "
++ "file system. For example, 
\"/path/to/local/file;/path/to/local/directory;"
++ "hdfs://$namenode_address/path/of/file;"
++ 
"hdfs://$namenode_address/path/of/directory\"");
 
 public static final ConfigOption> SHIP_ARCHIVES =
 key("yarn.ship-archives")
 .stringType()
 .asList()
 .noDefaultValue()
 .withDescription(
-"A semicolon-separated list of archives to be 
shipped to the YARN cluster."
-+ " These archives will be un-packed when 
localizing and they can be any of the following types: "
-+ "\".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\".");
+"A semicolon-separated list of archives to be 
shipped to the YARN cluster. "
++ "These archives can come from the local 
client and/or remote file system. "
++ "They will be un-packed when localizing 
and they can be any of the following "
++ "types: \".tar.gz\", \".tar\", \".tgz\", 
\".dst\", \".jar\", \".zip\". "
++ "For example, 
\"/path/to/local/archive.jar;"
++ 
"hdfs://$namenode_address/path/to/archive.jar\"");

Review Comment:
   Same 2 comments as the last option.



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -911,31 +929,21 @@ private ApplicationReport startAppMaster(
 final List systemClassPaths = 
fileUploader.registerProvidedLocalResources();
 final List uploadedDependencies =
 fileUploader.registerMultipleLocalResources(
-systemShipFiles.stream()
-.map(e -> new Path(e.toURI()))
-.collect(Collectors.toSet()),
-Path.CUR_DIR,
-LocalResourceType.FILE);
+systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE);

Review Comment:
   `systemShipFiles` includes 3 types of files:
   
   - ShipFiles
   - logConfigFilePath
   - LibFolders
   
   After this PR, the latter two types do not add schema, does it work as 
expected?



##
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java:
##
@@ -202,16 +204,27 @@ public YarnClusterDescriptor(
 this.nodeLabel = 
flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
 }
 
-private Optional> decodeFilesToShipToCluster(
+private Optional> decodeFilesToShipToCluster(
 final Configuration configuration, final 
ConfigOption> configOption) {
 checkNotNull(configuration);
 checkNotNull(configOption);
 
-final List files =
-ConfigUtils.decodeListFromConfig(configuration, configOption, 
File::new);
+List files = ConfigUtils.decodeListFromConfig(configuration, 
configOption, Path::new);
+files =
+files.stream()
+.map(
+path ->
+isLoca

[jira] [Comment Edited] (FLINK-16050) Add Attempt Information

2023-08-17 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283595#comment-17283595
 ] 

Matthias Pohl edited comment on FLINK-16050 at 8/17/23 9:34 AM:


[~vthinkxie] [~lining] What's the progress on this issue? It's mentioned in 
FLIP-100 -which is labeled as {{released}} in {{1.11}}-. In contrast, the Jira 
issue itself is still open/in progress?


was (Author: mapohl):
[~vthinkxie] [~lining] What's the progress on this issue? It's mentioned in 
FLIP-100 which is labeled as {{released}} in {{1.11}}. In contrast, the Jira 
issue itself is still open/in progress?

> Add Attempt Information
> ---
>
> Key: FLINK-16050
> URL: https://issues.apache.org/jira/browse/FLINK-16050
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST, Runtime / Web Frontend
>Reporter: lining
>Priority: Minor
>  Labels: auto-deprioritized-major, stale-minor
>
> According to the 
> [docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex],
>  there may exist more than one attempt in a subtask, but there is no way to 
> get the attempt history list in the REST API, users have no way to know if 
> the subtask has failed before. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-22090) Upload logs fails

2023-08-17 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl closed FLINK-22090.
-
Resolution: Cannot Reproduce

I'm closing this one. It hasn't reappared, apparently, and all the builds are 
cleaned up already.

> Upload logs fails
> -
>
> Key: FLINK-22090
> URL: https://issues.apache.org/jira/browse/FLINK-22090
> Project: Flink
>  Issue Type: Bug
>  Components: Test Infrastructure
>Reporter: Matthias Pohl
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> auto-deprioritized-minor, stale-minor, test-stability
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=382&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=599dab09-ab33-58b6-4804-349ab7dc2f73]
>  failed just because an {{upload logs}} step failed. It looks like this is an 
> AzureCI problem. Is this a known issue?
> The artifacts seems to be uploaded based on the logs. But [the download 
> link|https://dev.azure.com/mapohl/flink/_build/results?buildId=382&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267]
>  does not show up.
> Another build that had the same issue: 
> [test_ci_blinkplanner|https://dev.azure.com/mapohl/flink/_build/results?buildId=383&view=logs&j=d1352042-8a7d-50b6-3946-a85d176b7981&t=7b7009bb-e6bf-5426-3d4b-20b25eada636&l=75]
>  and 
> [test_ci_build_core|https://dev.azure.com/mapohl/flink/_build/results?buildId=383&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=599dab09-ab33-58b6-4804-349ab7dc2f73&l=44]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


zentol commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1296947756


##
tools/ci/verify_bundled_optional.sh:
##
@@ -17,24 +17,51 @@
 # limitations under the License.
 #
 
+usage() {
+  echo "Usage: $0 "
+  echo " A file containing the output 
of the Maven build."
+  echo ""
+  echo "mvnw clean package > "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ShadeOptionalChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+if [[ "$#" != "1" ]]; then
+  usage
+  exit 1
+fi
+
 ## Checks that all bundled dependencies are marked as optional in the poms
 MVN_CLEAN_COMPILE_OUT=$1
-CI_DIR=$2
-FLINK_ROOT=$3
 
-source "${CI_DIR}/maven-utils.sh"
+MVN=${MVN:-./mvnw}
 
-cd "$FLINK_ROOT" || exit
+dependency_plugin_output=/tmp/optional_dep.txt
 
-dependency_plugin_output=${CI_DIR}/optional_dep.txt
+$MVN dependency:tree -B > "${dependency_plugin_output}"

Review Comment:
   _every_ call (apart from `mvn exec` has to run with a parallelism of 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


1996fanrui commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1681973924

   Thanks @gaborgsomogyi  for the quick feedback.
   
   Sorry, I thought it's enough easy and clear, so I didn't explain detailed 
background and mark it as the `hotfix`. Let me explain it here:
   
   1. The `public put()` method call the `private put()`, and these 2 `put` 
both call  `createBasePathIfNeeded`, it's unnecessary.
   2. The `private put` is just called by `putlic put()`, so I removed the 
`private put()` in this PR.
   
   I checked why added 2 `put` methods, because the old interface has 2 `public 
put` methods. Now it just has one, so I think the `private put()` can be 
removed as well.
   
   https://github.com/apache/flink/assets/38427477/fa69f4f7-68aa-4325-a69f-2961e87d10f8";>
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] Zakelly commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices

2023-08-17 Thread via GitHub


Zakelly commented on code in PR #22890:
URL: https://github.com/apache/flink/pull/22890#discussion_r1296870428


##
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * There is one {@link FileMergingSnapshotManager} for each job per task 
manager. This class holds
+ * all {@link FileMergingSnapshotManager} objects for a task executor 
(manager).
+ */
+public class TaskExecutorFileMergingManager {
+/** Logger for this class. */
+private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorFileMergingManager.class);
+
+/**
+ * This map holds all FileMergingSnapshotManager for tasks running on this 
task
+ * manager(executor).
+ */
+@GuardedBy("lock")
+private final Map 
fileMergingSnapshotManagerByJobId;
+
+@GuardedBy("lock")
+private boolean closed;
+
+private final Object lock = new Object();
+
+/** Shutdown hook for this manager. */
+private final Thread shutdownHook;
+
+public TaskExecutorFileMergingManager() {
+this.fileMergingSnapshotManagerByJobId = new HashMap<>();
+this.closed = false;
+this.shutdownHook =
+ShutdownHookUtil.addShutdownHook(this::shutdown, 
getClass().getSimpleName(), LOG);
+}
+
+/**
+ * Initialize file merging snapshot manager for each job according 
configurations when {@link
+ * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
+ */
+public FileMergingSnapshotManager fileMergingSnapshotManagerForJob(
+@Nonnull JobID jobId,
+Configuration clusterConfiguration,
+Configuration jobConfiguration) {
+synchronized (lock) {
+if (closed) {
+throw new IllegalStateException(
+"TaskExecutorFileMergingManager is already closed and 
cannot "
++ "register a new 
FileMergingSnapshotManager.");
+}
+FileMergingSnapshotManager fileMergingSnapshotManager =
+fileMergingSnapshotManagerByJobId.get(jobId);
+if (fileMergingSnapshotManager == null) {
+// TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration

Review Comment:
   call ```FileMergingSnapshotManagerBuilder.build``` here?



##
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage

[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


gaborgsomogyi commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1682013702

   So it's a cosmetic change, ok.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2023-08-17 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755467#comment-17755467
 ] 

yong yang commented on FLINK-29546:
---

i hava same error in flink1.15.0

 

CREATE TABLE t2
(
    `log` STRING,
    `uid` AS get_json_object(`log`,'$.data.uid'),
    `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'),
    `isProduct` AS get_json_object(`log`, '$.data.isProduct'),
    `ct` AS  get_json_object(`log`,'$.data.dialogDO.ctime',0),
    `ct1` AS TO_TIMESTAMP_LTZ(
        get_json_object(`log`, '$.data.dialogDO.ctime',0)
        ,0
        ),
    proc_time AS PROCTIME()
    , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND
) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'scan.startup.mode' = 'earliest-offset',
      'topic' = 'councilFeedbackRealtime',
      'properties.group.id' = 'aaa',
      'format' = 'raw'
      );

 

 
package com.tuya.tlink.udf.common

import com.alibaba.fastjson2.\{JSONPath, JSONReader}
import org.apache.flink.table.functions.ScalarFunction

import scala.util.Try
import org.apache.flink.configuration.\{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object GetJsonObject {
def main(args: Array[String]): Unit = {

val conf = new Configuration
conf.setInteger(RestOptions.PORT, 38080)
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val tEnv = StreamTableEnvironment.create(env)

val table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2))
, DataTypes.FIELD("name", DataTypes.STRING())
),
row(2L, """\{"k1":"v111","k2":100}""")
)

tEnv.createTemporaryView("tb1", table)
tEnv.createFunction("get_json_object", classOf[GetJsonObject])

val tb = tEnv.executeSql(
"""
| select id,get_json_object(name,'$.k3',999) as v from tb1
|""".stripMargin)

println(tb.getTableSchema)
tb.print()

}

}

/**
* 适合获取一个json值
* 支持 多余转义字符 \ 的json
* 支持完全jsonpath eg: $.f1.f2
*/
class GetJsonObject extends ScalarFunction {


/**
* @param json 兼容 空字符串
* @param path
* @return
*/
def eval(json: String, path: String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse("")
}
}
jsobj
}

def eval(json: String, path: String,defaultValue:String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue)
}
}
jsobj
}

def eval(json: String, path: String, defaultValue: Long): Long = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part => {
jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue.toString)
}
}
jsobj.toLong
}

}

> UDF:Failed to compile split code, falling back to original code
> ---
>
> Key: FLINK-29546
> URL: https://issues.apache.org/jira/browse/FLINK-29546
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0
> Environment: my pom:
> 
>     org.apache.flink
>     flink-table-api-java-uber
> 
> 
>     org.apache.flink
>     flink-table-runtime
> 
> 
>     org.apache.flink
>     flink-table-planner-loader
> 
> jdk 1.8
>Reporter: Hui Wang
>Priority: Major
> Attachments: image-2023-08-17-18-18-50-937.png
>
>
> 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile 
> split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     at 
> org.apache.flink.table.runtime.operators.wi

[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2023-08-17 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755468#comment-17755468
 ] 

yong yang commented on FLINK-29546:
---

!image-2023-08-17-18-18-50-937.png!

> UDF:Failed to compile split code, falling back to original code
> ---
>
> Key: FLINK-29546
> URL: https://issues.apache.org/jira/browse/FLINK-29546
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0
> Environment: my pom:
> 
>     org.apache.flink
>     flink-table-api-java-uber
> 
> 
>     org.apache.flink
>     flink-table-runtime
> 
> 
>     org.apache.flink
>     flink-table-planner-loader
> 
> jdk 1.8
>Reporter: Hui Wang
>Priority: Major
> Attachments: image-2023-08-17-18-18-50-937.png
>
>
> 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile 
> split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148)
>     at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 15 common frames omitted
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>     ... 18 common frames omitted
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 
> 28: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)
>     at 
> org.codehaus.janino.UnitCompiler.getRefere

[jira] [Updated] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2023-08-17 Thread yong yang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yong yang updated FLINK-29546:
--
Attachment: image-2023-08-17-18-18-50-937.png

> UDF:Failed to compile split code, falling back to original code
> ---
>
> Key: FLINK-29546
> URL: https://issues.apache.org/jira/browse/FLINK-29546
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0
> Environment: my pom:
> 
>     org.apache.flink
>     flink-table-api-java-uber
> 
> 
>     org.apache.flink
>     flink-table-runtime
> 
> 
>     org.apache.flink
>     flink-table-planner-loader
> 
> jdk 1.8
>Reporter: Hui Wang
>Priority: Major
> Attachments: image-2023-08-17-18-18-50-937.png
>
>
> 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN  
> org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile 
> split code, falling back to original code
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148)
>     at 
> org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274)
>     at 
> org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139)
>     at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 15 common frames omitted
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>     ... 18 common frames omitted
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column 
> 28: Cannot determine simple type name "org"
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)
>     at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)
>     at 
> 

[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2023-08-17 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755469#comment-17755469
 ] 

yong yang commented on FLINK-29546:
---

error log:
{code:java}
//代码占位符
2023-08-17 15:46:02java.lang.RuntimeException: Could not instantiate generated 
class 'WatermarkGenerator$0' at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
  at 
org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
   at 
org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
 at 
org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
  at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
  at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) 
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)   
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at 
java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue. at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
  at 
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
 at 
org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
  ... 16 moreCaused by: 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
  ... 18 moreCaused by: org.apache.flink.api.common.InvalidProgramException: 
Table program cannot be compiled. This is a bug. Please file an issue.   at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
   at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
 at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
 ... 21 moreCaused by: org.codehaus.commons.compiler.CompileException: Line 30, 
Column 72: Cannot determine simple type name "org"   at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)   at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at 
org.codehaus.janino.UnitCompiler.getRefe

[jira] [Comment Edited] (FLINK-29546) UDF:Failed to compile split code, falling back to original code

2023-08-17 Thread yong yang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755467#comment-17755467
 ] 

yong yang edited comment on FLINK-29546 at 8/17/23 10:20 AM:
-

i hava same error in flink1.15.0

 

 
{code:java}
//代码占位符
CREATE TABLE t2
(
    `log` STRING,
    `uid` AS get_json_object(`log`,'$.data.uid'),
    `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'),
    `isProduct` AS get_json_object(`log`, '$.data.isProduct'),
    `ct` AS  get_json_object(`log`,'$.data.dialogDO.ctime',0),
    `ct1` AS TO_TIMESTAMP_LTZ(
        get_json_object(`log`, '$.data.dialogDO.ctime',0)
        ,0
        ),
    proc_time AS PROCTIME()
    , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND
) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'scan.startup.mode' = 'earliest-offset',
      'topic' = 'councilFeedbackRealtime',
      'properties.group.id' = 'aaa',
      'format' = 'raw'
      ); {code}
 

 

 

 
{code:java}
//代码占位符
package com.aa.tlink.udf.common
import com.alibaba.fastjson2.{JSONPath, JSONReader}
import org.apache.flink.table.functions.ScalarFunction
import scala.util.Try
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object GetJsonObject {
def main(args: Array[String]): Unit =
{ val conf = new Configuration conf.setInteger(RestOptions.PORT, 38080) val env 
= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) val tEnv = 
StreamTableEnvironment.create(env) val table = tEnv.fromValues( DataTypes.ROW( 
DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)) , DataTypes.FIELD("name", 
DataTypes.STRING()) ), row(2L, """\{"k1":"v111","k2":100}
""")
)
tEnv.createTemporaryView("tb1", table)
tEnv.createFunction("get_json_object", classOf[GetJsonObject])
val tb = tEnv.executeSql(
"""




select id,get_json_object(name,'$.k3',999) as v from tb1


""".stripMargin)




println(tb.getTableSchema)
tb.print()
}
}
/**

适合获取一个json值
支持 多余转义字符 \ 的json
支持完全jsonpath eg: $.f1.f2
*/
class GetJsonObject extends ScalarFunction {

/**

@param json 兼容 空字符串
@param path
@return
*/
def eval(json: String, path: String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")

var jsobj: String = json
path_arr.foreach {
path_part =>
{ jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse("")
 }
}
jsobj
}
def eval(json: String, path: String,defaultValue:String): String = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")
var jsobj: String = json
path_arr.foreach {
path_part =>
{ jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue)
 }
}
jsobj
}
def eval(json: String, path: String, defaultValue: Long): Long = {
// paths eg: $.f1.f2
// path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1
val path_arr = path.replace("$.", "") // f1.f2 或者f1
.split('.') // [f1,f2] 或者 [f1]
.map(s => s"$$.${s}")
var jsobj: String = json
path_arr.foreach {
path_part =>
{ jsobj = 
Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue.toString)
 }
}
jsobj.toLong
}
} {code}
 

 


was (Author: JIRAUSER301658):
i hava same error in flink1.15.0

 

CREATE TABLE t2
(
    `log` STRING,
    `uid` AS get_json_object(`log`,'$.data.uid'),
    `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'),
    `isProduct` AS get_json_object(`log`, '$.data.isProduct'),
    `ct` AS  get_json_object(`log`,'$.data.dialogDO.ctime',0),
    `ct1` AS TO_TIMESTAMP_LTZ(
        get_json_object(`log`, '$.data.dialogDO.ctime',0)
        ,0
        ),
    proc_time AS PROCTIME()
    , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND
) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'localhost:9092',
      'scan.startup.mode' = 'earliest-offset',
      'topic' = 'councilFeedbackRealtime',
      'properties.group.id' = 'aaa',
      'format' = 'raw'
      );

 

 
package com.tuya.tlink.udf.common

import com.alibaba.fastjson2.\{JSONPath, JSONReader}
import org.apache.flink.table.functions.ScalarFunction

import scala.util.Try
import org.apache.flink.configuration.\{Configuration, RestOptions}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.row
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object GetJsonObject {
def main(args: Array[String]): Unit = {

val 

[jira] [Updated] (FLINK-28229) Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28229:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Introduce Source API alternatives for 
> StreamExecutionEnvironment#fromCollection() methods
> -
>
> Key: FLINK-28229
> URL: https://issues.apache.org/jira/browse/FLINK-28229
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> * FromElementsFunction
>  * FromIteratorFunction
> are based on SourceFunction API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28171:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Adjust Job and Task manager port definitions to work with Istio+mTLS
> 
>
> Key: FLINK-28171
> URL: https://issues.apache.org/jira/browse/FLINK-28171
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.14.4
> Environment: flink-kubernetes-operator 1.0.0
> Flink 1.14-java11
> Kubernetes v1.19.5
> Istio 1.7.6
>Reporter: Moshe Elisha
>Assignee: Moshe Elisha
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Hello,
>  
> We are launching Flink deployments using the [Flink Kubernetes 
> Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/]
>  on a Kubernetes cluster with Istio and mTLS enabled.
>  
> We found that the TaskManager is unable to communicate with the JobManager on 
> the jobmanager-rpc port:
>  
> {{2022-06-15 15:25:40,508 WARN  akka.remote.ReliableDeliverySupervisor        
>                [] - Association with remote system 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]
>  has failed, address is now gated for [50] ms. Reason: [Association failed 
> with 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]]
>  Caused by: [The remote system explicitly disassociated (reason unknown).]}}
>  
> The reason for the issue is that the JobManager service port definitions are 
> not following the Istio guidelines 
> [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/]
>  (see example below).
>  
> There was also an email discussion around this topic in the users mailing 
> group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - 
> port definitions".
> With the help of the community, we were able to work around the issue but it 
> was very hard and forced us to skip Istio proxy which is not ideal.
>  
> We would like you to consider changing the default port definitions, either
>  # Rename the ports – I understand it is Istio specific guideline but maybe 
> it is better to at least be aligned with one (popular) vendor guideline 
> instead of none at all.
>  # Add the “appProtocol” property[1] that is not specific to any vendor but 
> requires Kubernetes >= 1.19 where it was introduced as beta and moved to 
> stable in >= 1.20. The option to add appProtocol property was added only in 
> [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with 
> [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570].
>  # Or allow a way to override the defaults.
>  
> [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol]
>  
>  
> {{# k get service inference-results-to-analytics-engine -o yaml}}
> {{apiVersion: v1}}
> {{kind: Service}}
> {{...}}
> {{spec:}}
> {{  clusterIP: None}}
> {{  ports:}}
> {{  - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" 
> property*}}
> {{    port: 6123}}
> {{    protocol: TCP}}
> {{    targetPort: 6123}}
> {{  - name: blobserver *# should start with "tcp-" or add "appProtocol" 
> property*}}
> {{    port: 6124}}
> {{    protocol: TCP}}
> {{    targetPort: 6124}}
> {{...}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27815) Improve the join reorder strategy for batch sql job

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27815:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Improve the join reorder strategy for batch sql job 
> 
>
> Key: FLINK-27815
> URL: https://issues.apache.org/jira/browse/FLINK-27815
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Join is heavy operation in the execution, the join order in a query can have 
> a significant impact on the query’s performance. 
> Currently, the planner has one  join reorder strategy which is provided by 
> Apache Calcite, and it strongly depends on the statistics.
> It's better we can provide different join reorder strategies for different 
> situations, such as:
> 1. provide a join reorder strategy without statistics, e.g. eliminate cross 
> joins
> 2. improve current join reorders strategy with statistics
> 3. provide hints to allow users to choose join order strategy
> 4. ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27939) [JUnit5 Migration] Module: flink-connector-hive

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27939:
---
Labels: stale-assigned starter  (was: starter)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> [JUnit5 Migration] Module: flink-connector-hive
> ---
>
> Key: FLINK-27939
> URL: https://issues.apache.org/jira/browse/FLINK-27939
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Alexander Preuss
>Assignee: hk__lrzy
>Priority: Minor
>  Labels: stale-assigned, starter
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27954:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> JobVertexFlameGraphHandler does not work on standby Dispatcher
> --
>
> Key: FLINK-27954
> URL: https://issues.apache.org/jira/browse/FLINK-27954
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination, Runtime / REST
>Affects Versions: 1.15.0
>Reporter: Chesnay Schepler
>Assignee: Weijie Guo
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0, 1.16.3, 1.17.2
>
>
> The {{JobVertexFlameGraphHandler}} relies internally on the 
> {{JobVertexThreadInfoTracker}} which calls 
> {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a 
> gateway for requesting the thread info from the task executors. Since this 
> gateway is not serializable it would categorically fail if called from a 
> standby dispatcher.
> Instead this should follow the logic of the {{MetricFetcherImpl}}, which 
> requests addresses instead and manually connects to the task executors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27882) [JUnit5 Migration] Module: flink-scala

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27882:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> [JUnit5 Migration] Module: flink-scala
> --
>
> Key: FLINK-27882
> URL: https://issues.apache.org/jira/browse/FLINK-27882
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-7697) Add metrics for Elasticsearch Sink

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-7697:
--
Labels: auto-deprioritized-major auto-unassigned pull-request-available 
stale-minor  (was: auto-deprioritized-major auto-unassigned 
pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / ElasticSearch
>Reporter: Hai Zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available, stale-minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28433) Allow connection to mysql through mariadb driver

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28433:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Allow connection to mysql through mariadb driver
> 
>
> Key: FLINK-28433
> URL: https://issues.apache.org/jira/browse/FLINK-28433
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / JDBC
>Reporter: PengLei
>Assignee: bo zhao
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
> Attachments: image-2022-07-07-09-29-06-834.png
>
>
> Flink connector support connection to mysql. But the url must be started with 
> "jdbc:mysql". 
> Some user need to use mariadb dirver to connect to mysql. It can be achieved 
> by setting the driver parameter in jdbcOptions. Unfortunately, the url 
> verification fails.
>  
> as follows:
> !image-2022-07-07-09-29-06-834.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-6935) Integration of SQL and CEP

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-6935:
--
Labels: auto-deprioritized-major auto-unassigned stale-minor  (was: 
auto-deprioritized-major auto-unassigned)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Integration of SQL and CEP
> --
>
> Key: FLINK-6935
> URL: https://issues.apache.org/jira/browse/FLINK-6935
> Project: Flink
>  Issue Type: New Feature
>  Components: Library / CEP, Table SQL / API
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, stale-minor
>
> Flink's CEP library is a great library for complex event processing, more and 
> more customers are expressing their interests in it. But it also has some 
> limitations that users usually have to write a lot of code even for a very 
> simple pattern match use case as it currently only supports the Java API.
> CEP DSLs and SQLs strongly resemble each other. CEP's additional features 
> compared to SQL boil down to pattern detection. So It will be awesome to 
> consolidate CEP and SQL. It makes SQL more powerful to support more usage 
> scenario. And it gives users the ability to easily and quickly to build CEP 
> applications.
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to 
> start this work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27970) [JUnit5 Migration] Module: flink-hadoop-buik

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27970:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> [JUnit5 Migration] Module: flink-hadoop-buik
> 
>
> Key: FLINK-27970
> URL: https://issues.apache.org/jira/browse/FLINK-27970
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Ryan Skraba
>Assignee: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27805) Bump ORC version to 1.7.8

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27805:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Bump ORC version to 1.7.8
> -
>
> Key: FLINK-27805
> URL: https://issues.apache.org/jira/browse/FLINK-27805
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: jia liu
>Assignee: Panagiotis Garefalakis
>Priority: Minor
>  Labels: pull-request-available, stale-assigned
>
> The current ORC dependency version of flink is 1.5.6, but the latest ORC 
> version 1.7.x has been released for a long time.
> In order to use these new features (zstd compression, column encryption 
> etc.), we should upgrade the orc version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-13698) Rework threading model of CheckpointCoordinator

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-13698:
---
Labels: auto-deprioritized-critical auto-deprioritized-major stale-minor  
(was: auto-deprioritized-critical auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Rework threading model of CheckpointCoordinator
> ---
>
> Key: FLINK-13698
> URL: https://issues.apache.org/jira/browse/FLINK-13698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> Currently {{CheckpointCoordinator}} and {{CheckpointFailureManager}} code is 
> executed by multiple different threads (mostly {{ioExecutor}}, but not only). 
> It's causing multiple concurrency issues, for example: 
> https://issues.apache.org/jira/browse/FLINK-13497
> Proper fix would be to rethink threading model there. At first glance it 
> doesn't seem that this code should be multi threaded, except of parts doing 
> the actual IO operations, so it should be possible to run everything in one 
> single ExecutionGraph's thread and just run asynchronously necessary IO 
> operations with some feedback loop ("mailbox style").
> I would strongly recommend fixing this issue before adding new features in 
> the \{{CheckpointCoordinator}} component.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-11141) Key generation for RocksDBMapState can theoretically be ambiguous

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-11141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-11141:
---
Labels: auto-deprioritized-critical auto-deprioritized-major 
auto-deprioritized-minor stale-minor  (was: auto-deprioritized-critical 
auto-deprioritized-major auto-deprioritized-minor)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Key generation for RocksDBMapState can theoretically be ambiguous
> -
>
> Key: FLINK-11141
> URL: https://issues.apache.org/jira/browse/FLINK-11141
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Priority: Minor
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> auto-deprioritized-minor, stale-minor
>
> RocksDBMap state stores values in RocksDB under a composite key from the 
> serialized bytes of {{key-group-id|key|namespace|user-key}}. In this 
> composition, key, namespace, and user-key can either have fixed sized or 
> variable sized serialization formats. In cases of at least 2 variable 
> formats, ambiguity can be possible, e.g.:
> abcd <-> efg
> abc <-> defg
> Our code takes care of this for all other states, where composite keys only 
> consist of key and namespace by checking for 2x variable size and appending 
> the serialized length to each byte sequence.
> However, for map state there is no inclusion of the user-key in the check for 
> potential ambiguity, as well as for appending the size. This means that, in 
> theory, some combinations can produce colliding composite keys in RocksDB. 
> What is required is to include the user-key serializer in the check and 
> append the length there as well.
> Please notice that this cannot be simply changed because it has implications 
> for backwards compatibility and requires some form of migration for the state 
> keys on restore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-28022) Support Google Cloud PubSub connector in Python DataStream API

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-28022:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> Support Google Cloud PubSub connector in Python DataStream API
> --
>
> Key: FLINK-28022
> URL: https://issues.apache.org/jira/browse/FLINK-28022
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Python, Connectors / Google Cloud PubSub
>Reporter: pengmd
>Assignee: pengmd
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.18.0
>
>
> Support Google Cloud PubSub connector in Python DataStream API



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27885) [JUnit5 Migration] Module: flink-csv

2023-08-17 Thread Flink Jira Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-27885:
---
Labels: pull-request-available stale-assigned  (was: pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issue is assigned but has not 
received an update in 30 days, so it has been labeled "stale-assigned".
If you are still working on the issue, please remove the label and add a 
comment updating the community on your progress.  If this issue is waiting on 
feedback, please consider this a reminder to the committer/reviewer. Flink is a 
very active project, and so we appreciate your patience.
If you are no longer working on the issue, please unassign yourself so someone 
else may work on it.


> [JUnit5 Migration] Module: flink-csv
> 
>
> Key: FLINK-27885
> URL: https://issues.apache.org/jira/browse/FLINK-27885
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Ryan Skraba
>Assignee: Ryan Skraba
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] xiangyuf opened a new pull request, #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…

2023-08-17 Thread via GitHub


xiangyuf opened a new pull request, #23230:
URL: https://github.com/apache/flink/pull/23230

   ## What is the purpose of the change
   
   Fix the issue that redundant taskManagers will not be fulfilled in 
FineGrainedSlotManager periodically.
   
   ## Brief change log
   
   - Change ResourceAllocationStrategy#tryReleaseUnusedResources to 
ResourceAllocationStrategy#tryReconcileClusterResources
   - Change  ResourceReleaseResult to ResourceReconcileResult and add field 
pendingTaskManagersToAdd to ResourceReconcileResult
   - make the DefaultResourceAllocationStrategy#tryReconcileClusterResources 
not only release but also fulfill the redundant taskmanagers
   - extract 
DefaultResourceAllocationStrategy::tryFulFillRedundantResourcesWithAction as a 
reused function for both tryFulfillRequirements and tryReconcileClusterResources
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added unit test 
DefaultResourceAllocationStrategyTest#testRedundantResourceShouldBeFulfilled
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager 
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32880) Redundant taskManager should be replenished in FineGrainedSlotManager

2023-08-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32880:
---
Labels: pull-request-available  (was: )

> Redundant taskManager should be replenished in FineGrainedSlotManager
> -
>
> Key: FLINK-32880
> URL: https://issues.apache.org/jira/browse/FLINK-32880
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, if you are using FineGrainedSlotManager, when a redundant 
> taskmanager exit abnormally, no extra taskmanager will be replenished during 
> the
> periodical check in FineGrainedSlotManager.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager

2023-08-17 Thread xiangyu feng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiangyu feng updated FLINK-32880:
-
Summary: Redundant taskManagers should always be fulfilled in 
FineGrainedSlotManager  (was: Redundant taskManager should be replenished in 
FineGrainedSlotManager)

> Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
> ---
>
> Key: FLINK-32880
> URL: https://issues.apache.org/jira/browse/FLINK-32880
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, if you are using FineGrainedSlotManager, when a redundant 
> taskmanager exit abnormally, no extra taskmanager will be replenished during 
> the
> periodical check in FineGrainedSlotManager.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…

2023-08-17 Thread via GitHub


flinkbot commented on PR #23230:
URL: https://github.com/apache/flink/pull/23230#issuecomment-1682079861

   
   ## CI report:
   
   * 1344e4452bb0506b44e3413639350ec33472da76 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21289: [FLINK-29452] Allow unit tests to be executed independently

2023-08-17 Thread via GitHub


XComp commented on code in PR #21289:
URL: https://github.com/apache/flink/pull/21289#discussion_r1297093524


##
flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java:
##
@@ -21,56 +21,69 @@
 import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link RetryOnFailure} annotation on JUnit5 {@link 
RetryExtension}. */
 @ExtendWith(RetryExtension.class)
 class RetryOnFailureExtensionTest {
 
-private static final int NUMBER_OF_RUNS = 5;
-
-private static int numberOfFailedRuns;
+private static final int NUMBER_OF_RETRIES = 5;
 
-private static int numberOfSuccessfulRuns;
+private static final HashMap methodRunCount = new 
HashMap<>();
 
-private static boolean firstRun = true;
+@BeforeEach
+void incrementMethodRunCount(TestInfo testInfo) {
+// Set or increment the run count for the unit test method, by the 
method short name.
+// This starts at 1 and is incremented before the test starts.
+testInfo.getTestMethod()
+.ifPresent(
+method ->
+methodRunCount.compute(
+method.getName(), (k, v) -> (v == 
null) ? 1 : v + 1));
+}
 
 @AfterAll
 static void verify() {
-assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns);
-assertEquals(3, numberOfSuccessfulRuns);
+assertThat(methodRunCount.get("testRetryOnFailure"))

Review Comment:
   I think we can apply the same approach that I described above here. 



##
flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java:
##
@@ -37,53 +40,66 @@
 @ExtendWith(RetryExtension.class)
 class RetryOnExceptionExtensionTest {
 
-private static final int NUMBER_OF_RUNS = 3;
+private static final int NUMBER_OF_RETRIES = 3;
 
-private static int runsForSuccessfulTest = 0;
+private static final HashMap methodRunCount = new 
HashMap<>();
 
-private static int runsForTestWithMatchingException = 0;
-
-private static int runsForTestWithSubclassException = 0;
-
-private static int runsForPassAfterOneFailure = 0;
+@BeforeEach
+void incrementMethodRunCount(TestInfo testInfo) {
+// Set or increment the run count for the unit test method, by the 
method short name.
+// This starts at 1 and is incremented before the test starts.
+testInfo.getTestMethod()
+.ifPresent(
+method ->
+methodRunCount.compute(
+method.getName(), (k, v) -> (v == 
null) ? 1 : v + 1));
+}
 
 @AfterAll
 static void verify() {
-assertThat(runsForTestWithMatchingException).isEqualTo(NUMBER_OF_RUNS 
+ 1);
-assertThat(runsForTestWithSubclassException).isEqualTo(NUMBER_OF_RUNS 
+ 1);
-assertThat(runsForSuccessfulTest).isOne();
-assertThat(runsForPassAfterOneFailure).isEqualTo(2);
+assertThat(methodRunCount.get("testSuccessfulTest"))

Review Comment:
   I'm wondering whether we could also implement this test without having plain 
method names that we would have to add everywhere. We could do that by 
utilizing the `TestInfo` parameter in every test method and register the 
callbacks for the corresponding test method. That has the advantage that we 
have the test code in one location (the verification callback is defined within 
the test method instead of the `@AfterAll` annotated method). WDYT?
   
   We would have something like a `verificationCallbackRegistry` next to the 
`methodRunCount` and utilize methods like the following ones:
   ```
   private static int assertAndReturnRunCount(TestInfo testInfo) {
   return methodRunCount.get(assertAndReturnTestMethodName(testInfo));
   }
   
   private static void registerCallbackForTest(TestInfo testInfo, 
Consumer verification) {
   verificationCallbackRegistry.putIfAbsent(
   assertAndReturnTestMethodName(testInfo),
   () -> 
verification.accept(assertAndReturnRunCount(testInfo)));
   }
   
   private static String assertAndReturnTestMethodName(TestInfo testInfo) {
   return testInfo.getTestMethod()
   .orElseThrow(() -> new AssertionError("No test method is 
provided."))
   .getName();
   }
   ```
   
   The testInfo comes from the test method's parameters. All test handlin

[GitHub] [flink] XComp commented on a diff in pull request #21289: [FLINK-29452] Allow unit tests to be executed independently

2023-08-17 Thread via GitHub


XComp commented on code in PR #21289:
URL: https://github.com/apache/flink/pull/21289#discussion_r1297093524


##
flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java:
##
@@ -21,56 +21,69 @@
 import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
 
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link RetryOnFailure} annotation on JUnit5 {@link 
RetryExtension}. */
 @ExtendWith(RetryExtension.class)
 class RetryOnFailureExtensionTest {
 
-private static final int NUMBER_OF_RUNS = 5;
-
-private static int numberOfFailedRuns;
+private static final int NUMBER_OF_RETRIES = 5;
 
-private static int numberOfSuccessfulRuns;
+private static final HashMap methodRunCount = new 
HashMap<>();
 
-private static boolean firstRun = true;
+@BeforeEach
+void incrementMethodRunCount(TestInfo testInfo) {
+// Set or increment the run count for the unit test method, by the 
method short name.
+// This starts at 1 and is incremented before the test starts.
+testInfo.getTestMethod()
+.ifPresent(
+method ->
+methodRunCount.compute(
+method.getName(), (k, v) -> (v == 
null) ? 1 : v + 1));
+}
 
 @AfterAll
 static void verify() {
-assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns);
-assertEquals(3, numberOfSuccessfulRuns);
+assertThat(methodRunCount.get("testRetryOnFailure"))

Review Comment:
   I think we can apply the same approach [that I described 
above](https://github.com/apache/flink/pull/21289#discussion_r1297092550) here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


XComp commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1297097220


##
tools/ci/license_check.sh:
##
@@ -17,6 +17,34 @@
 # limitations under the License.
 

 
+usage() {
+  if [[ "$#" != "2" ]]; then
+echo "Usage: $0  "
+echo " A file containing the 
output of the Maven build."
+echo "  A directory containing a 
Maven repository into which the Flink artifacts were deployed."
+echo ""
+echo "Example preparation:"
+echo "mvnw clean deploy 
-DaltDeploymentRepository=validation_repository::default::file:
 > "
+echo ""
+echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+echo "See further details in the JavaDoc of LicenseChecker."
+  fi
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage

Review Comment:
   :disappointed: true, I mixed that one up :facepalm: whatever :shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


XComp commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1297104015


##
tools/ci/compile.sh:
##
@@ -69,34 +80,38 @@ fi
 
 echo " Checking Javadocs "
 
+javadoc_output=/tmp/javadoc.out
+
 # use the same invocation as .github/workflows/docs.sh
-run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
+$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \

Review Comment:
   ```suggestion
   $MVN javadoc:aggregate -T1 -DadditionalJOption='-Xdoclint:none' \
   ```
   according to your statement "every call (apart from mvn exec has to run with 
a parallelism of 1.". But tbh, I don't see value here because we're not parsing 
the output. In this sense, it would be up for the user to decide. :shrug: 



##
tools/ci/compile.sh:
##
@@ -69,34 +80,38 @@ fi
 
 echo " Checking Javadocs "
 
+javadoc_output=/tmp/javadoc.out
+
 # use the same invocation as .github/workflows/docs.sh
-run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
+$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
   -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true 
-Denforcer.skip=true -Dspotless.skip=true -Drat.skip=true \
-  -Dheader=someTestHeader > javadoc.out
+  -Dheader=someTestHeader > ${javadoc_output}
 EXIT_CODE=$?
 if [ $EXIT_CODE != 0 ] ; then
   echo "ERROR in Javadocs. Printing full output:"
-  cat javadoc.out ; rm javadoc.out
+  cat ${javadoc_output}
   exit $EXIT_CODE
 fi
 
 echo " Checking Scaladocs "
 
-run_mvn scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true 
-Dspotless.skip=true -pl flink-scala 2> scaladoc.out
+scaladoc_output=/tmp/scaladoc.out
+
+$MVN scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true 
-Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output}

Review Comment:
   ```suggestion
   $MVN scala:doc -T1 -Dcheckstyle.skip=true -Denforcer.skip=true 
-Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output}
   ```
   according to your statement "every call (apart from mvn exec has to run with 
a parallelism of 1.". But here as well: We're not parsing the output. So it 
would be up to the user how to deal with it.



##
tools/ci/verify_bundled_optional.sh:
##
@@ -17,31 +17,58 @@
 # limitations under the License.
 #
 
+usage() {
+  echo "Usage: $0 "
+  echo " A file containing the output 
of the Maven build."
+  echo ""
+  echo "mvnw clean package > "
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ShadeOptionalChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+if [[ "$#" != "1" ]]; then
+  usage
+  exit 1
+fi
+
 ## Checks that all bundled dependencies are marked as optional in the poms
 MVN_CLEAN_COMPILE_OUT=$1
-CI_DIR=$2
-FLINK_ROOT=$3
 
-source "${CI_DIR}/maven-utils.sh"
+MVN=${MVN:-./mvnw}
 
-cd "$FLINK_ROOT" || exit
+dependency_plugin_output=/tmp/dependency_tree_optional.txt
 
-dependency_plugin_output=${CI_DIR}/optional_dep.txt
+$MVN dependency:tree -B -T1 > "${dependency_plugin_output}"

Review Comment:
   ```suggestion
   # dependency:tree needs to run without multi-threading support to ensure 
that the output stays parseable (the lines appear in a sequential order)
   $MVN dependency:tree -B -T1 > "${dependency_plugin_output}"
   ```
   Should we add a comment to ensure that there is no desire to "optimize" the 
maven call in the future?



##
tools/ci/verify_scala_suffixes.sh:
##
@@ -37,41 +37,52 @@
 #
 # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala
 # dependent modules.
-CI_DIR=$1
-FLINK_ROOT=$2
+
+
+usage() {
+  echo "Usage: $0"
+  echo ""
+  echo "The environment variable MVN is used to specify the Maven binaries; 
defaults to 'mvnw'."
+  echo "See further details in the JavaDoc of ScalaSuffixChecker."
+}
+
+while getopts 'h' o; do
+  case "${o}" in
+h)
+  usage
+  exit 0
+  ;;
+  esac
+done
+
+MVN=${MVN:-./mvnw}
 
 echo "--- Flink Scala Dependency Analyzer ---"
 echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'."
 echo "If you haven't built the project, please do so first by running \"mvn 
clean install -DskipTests\""
 
-source "${CI_DIR}/maven-utils.sh"
-
-cd "$FLINK_ROOT" || exit
+dependency_plugin_output=/tmp/dependency_tree_scala.txt
 
-dependency_plugin_output=${CI_DIR}/dep.txt
-
-run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} 
>> "${dependency_plugin_output}"
+$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} 
-T1 > "${dependency_plugin_output}"

Review Comment:
   ```suggestion
   # dependency:tree needs to run without multi-threading support to ensure 
that the output stays parseable (the lines appear in a sequential order)
   $MVN dep

[GitHub] [flink] xiangyuf opened a new pull request, #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…

2023-08-17 Thread via GitHub


xiangyuf opened a new pull request, #23231:
URL: https://github.com/apache/flink/pull/23231

   ## What is the purpose of the change
   
   Just as slotmanager.number-of-slots.max, we should also ignore the cpu and 
memory limitation in standalone mode.
   
   ## Brief change log
   
   - Add StandaloneResourceManagerFactory#overwriteMaxResourceNumConfig to 
remove both max slot/cpu/memory limit option.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: 
JobManager(StandaloneResourceManagerFactory)
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…

2023-08-17 Thread via GitHub


flinkbot commented on PR #23231:
URL: https://github.com/apache/flink/pull/23231#issuecomment-1682164857

   
   ## CI report:
   
   * 8e42b776f803b8f4dc6b82fd29b04c481da3d62b UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] KarmaGYZ commented on a diff in pull request #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…

2023-08-17 Thread via GitHub


KarmaGYZ commented on code in PR #23230:
URL: https://github.com/apache/flink/pull/23230#discussion_r1297084113


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java:
##
@@ -48,15 +48,16 @@ ResourceAllocationResult tryFulfillRequirements(
 BlockedTaskManagerChecker blockedTaskManagerChecker);
 
 /**
- * Try to make a release decision to release unused PendingTaskManagers 
and TaskManagers. This
- * is more light weighted than {@link #tryFulfillRequirements}, only 
consider empty registered /
- * pending workers and assume all requirements are fulfilled by registered 
/ pending workers.
+ * Try to make a reconcile decision between release unused 
PendingTaskManagers/TaskManagers and
+ * add redundant PendingTaskManagers. This is more light weighted than 
{@link

Review Comment:
   Try to make a decision to reconcile the cluster resources.



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -161,6 +162,7 @@ public ResourceReleaseResult tryReleaseUnusedResources(
 
 List pendingTaskManagersNonUse = new ArrayList<>();
 List pendingTaskManagersInuse = new ArrayList<>();
+

Review Comment:
   ```suggestion
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -224,7 +224,7 @@ public void start(
 if (resourceAllocator.isSupported()) {
 taskManagerReleasableCheck =

Review Comment:
   ```suggestion
   clusterReconciliationCheck =
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -224,7 +224,7 @@ public void start(
 if (resourceAllocator.isSupported()) {
 taskManagerReleasableCheck =
 scheduledExecutor.scheduleWithFixedDelay(
-() -> 
mainThreadExecutor.execute(this::tryReleaseUnusedTaskManagers),
+() -> 
mainThreadExecutor.execute(this::checkClusterResource),

Review Comment:
   ```suggestion
   () -> 
mainThreadExecutor.execute(this::checkClusterReconciliation),
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##
@@ -812,29 +812,34 @@ public Collection 
getAllocatedSlotsOf(InstanceID instanceID) {
 // Internal periodic check methods
 // 
-
 
-private void tryReleaseUnusedTaskManagers() {
-if (checkTaskManagerReleasable()) {
+private void checkClusterResource() {

Review Comment:
   ```suggestion
   private void checkClusterReconciliation() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


1996fanrui commented on PR #23228:
URL: https://github.com/apache/flink/pull/23228#issuecomment-1682167358

   Thanks @gaborgsomogyi for the review, CI is green, merging~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui merged pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore

2023-08-17 Thread via GitHub


1996fanrui merged PR #23228:
URL: https://github.com/apache/flink/pull/23228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xiguashu opened a new pull request, #23232: [FLINK-32887][connector/common] ExecutorNotifier#notifyReadyAsync wil…

2023-08-17 Thread via GitHub


xiguashu opened a new pull request, #23232:
URL: https://github.com/apache/flink/pull/23232

   
   ## What is the purpose of the change
   To speed up the procedure of source split enumerator discovering and 
assigning splits, especially in the case when the 'callable' execution duration 
(usually for calculating splits) is much longer than the schedule period. 
Because in this case, 'callable' tasks (to discover tasks) will be continuously 
created and queued in the executor, and will block the following tasks (to 
assign splits).
   
   ## Brief change log
   Change ExecutorNotifier#notifyReadyAsync implementation from 
'scheduleAtFixedRate' to 'scheduleWithFixedDelay'.
   
   
   ## Verifying this change
   
   This change is a trivial rework.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32887) SourceCoordinatorContext#workerExecutor may cause task initializing slowly

2023-08-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32887:
---
Labels: pull-request-available  (was: )

> SourceCoordinatorContext#workerExecutor may cause task initializing slowly 
> ---
>
> Key: FLINK-32887
> URL: https://issues.apache.org/jira/browse/FLINK-32887
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Common, Runtime / Coordination
>Affects Versions: 1.15.2
>Reporter: liang jie
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> SourceCoordinatorContext#workerExecutor is typically used to calculate 
> partitions of a source task and is implemented by a ScheduledExecutorService 
> with only 1 core (hard coded).Tasks to calculate partitions with be executed 
> through the function 'workerExecutor.scheduleAtFixedRate'.
> --- 
> In some case, for example, 'getSubscribedPartitions' method will take quite a 
> long time(e.g. 5min) because of lots of topics are included in the same task 
> or requests to outer systems timeout etc. And partitionDiscoveryInterval is 
> set to a short intervel e.g. 1min.
> In this case, 'getSubscribedPartitions' runnable tasks will be triggered 
> repeatedly and be queued in the queue of workerExecutor, during the first 
> 'getSubscribedPartitions' task running duration, which causing 
> 'checkPartitionChanges' tasks will be queued too. Each 
> 'checkPartitionChanges' task needs to wait for 25mins(5 * 
> 'getSubscribedPartitions' task execution duration) before it was executed.
> ---
> In my view, tasks of workerExecutor should be scheduled with fix deley rather 
> than at fixed rate. Because there is no meaning that 
> 'getSubscribedPartitions' tasks being repeatedly executed without a 
> 'checkPartitionChanges' execution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] KarmaGYZ commented on a diff in pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…

2023-08-17 Thread via GitHub


KarmaGYZ commented on code in PR #23231:
URL: https://github.com/apache/flink/pull/23231#discussion_r1297125240


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java:
##
@@ -109,17 +109,35 @@ protected ResourceManager 
createResourceManager(
  * @return the configuration for standalone ResourceManager
  */
 @VisibleForTesting
-public static Configuration getConfigurationWithoutMaxSlotNumberIfSet(
+public static Configuration getConfigurationWithoutMaxResourceNumIfSet(

Review Comment:
   ```suggestion
   public static Configuration getConfigurationWithoutMaxResourceIfSet(
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java:
##
@@ -109,17 +109,35 @@ protected ResourceManager 
createResourceManager(
  * @return the configuration for standalone ResourceManager
  */
 @VisibleForTesting
-public static Configuration getConfigurationWithoutMaxSlotNumberIfSet(
+public static Configuration getConfigurationWithoutMaxResourceNumIfSet(
 Configuration configuration) {
 final Configuration copiedConfig = new Configuration(configuration);
-// The max slot limit should not take effect for standalone cluster, 
we overwrite the
+overwriteMaxResourceNumConfig(copiedConfig);
+
+return copiedConfig;
+}
+
+private static void overwriteMaxResourceNumConfig(Configuration 
configuration) {

Review Comment:
   ```suggestion
   private static void removeMaxResourceConfig(Configuration configuration) 
{
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #23232: [FLINK-32887][connector/common] ExecutorNotifier#notifyReadyAsync wil…

2023-08-17 Thread via GitHub


flinkbot commented on PR #23232:
URL: https://github.com/apache/flink/pull/23232#issuecomment-1682178223

   
   ## CI report:
   
   * 9a0a0d8b9d5ce7cc0bfcfe5281ed6bec14ccf638 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh

2023-08-17 Thread via GitHub


zentol commented on code in PR #23195:
URL: https://github.com/apache/flink/pull/23195#discussion_r1297137606


##
tools/ci/compile.sh:
##
@@ -69,34 +80,38 @@ fi
 
 echo " Checking Javadocs "
 
+javadoc_output=/tmp/javadoc.out
+
 # use the same invocation as .github/workflows/docs.sh
-run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
+$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \

Review Comment:
   javadoc can run in parallel.



##
tools/ci/compile.sh:
##
@@ -69,34 +80,38 @@ fi
 
 echo " Checking Javadocs "
 
+javadoc_output=/tmp/javadoc.out
+
 # use the same invocation as .github/workflows/docs.sh
-run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
+$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \
   -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true 
-Denforcer.skip=true -Dspotless.skip=true -Drat.skip=true \
-  -Dheader=someTestHeader > javadoc.out
+  -Dheader=someTestHeader > ${javadoc_output}
 EXIT_CODE=$?
 if [ $EXIT_CODE != 0 ] ; then
   echo "ERROR in Javadocs. Printing full output:"
-  cat javadoc.out ; rm javadoc.out
+  cat ${javadoc_output}
   exit $EXIT_CODE
 fi
 
 echo " Checking Scaladocs "
 
-run_mvn scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true 
-Dspotless.skip=true -pl flink-scala 2> scaladoc.out
+scaladoc_output=/tmp/scaladoc.out
+
+$MVN scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true 
-Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output}

Review Comment:
   scaladoc can run in parallel.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a diff in pull request #23209: [FLINK-32824] Port Calcite's fix for the sql like operator

2023-08-17 Thread via GitHub


lincoln-lil commented on code in PR #23209:
URL: https://github.com/apache/flink/pull/23209#discussion_r1296990792


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala:
##
@@ -793,4 +794,29 @@ class CalcITCase extends StreamingTestBase {
 val expected = List("2.0", "2.0", "2.0")
 assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
+
+  @Test
+  def testFilterOnString(): Unit = {

Review Comment:
   nit: we can simply extend the current 
`org.apache.flink.table.planner.runtime.batch.sql.CalcITCase#testFilterOnString`
 test case adding a new `checkResult` clause to cover the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)

2023-08-17 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755529#comment-17755529
 ] 

Hongshun Wang commented on FLINK-32668:
---

Thanks a lot, I get it. I will modify this as soon as possible.

> fix up watchdog timeout error msg  in common.sh(e2e test) 
> --
>
> Key: FLINK-32668
> URL: https://issues.apache.org/jira/browse/FLINK-32668
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
> Attachments: image-2023-07-25-15-27-37-441.png
>
>
> When run e2e test, an error like this occrurs:
> !image-2023-07-25-15-27-37-441.png|width=733,height=115!
>  
> The corresponding code:
> {code:java}
> kill_test_watchdog() {
>     local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>     echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>     kill $watchdog_pid
> } 
> internal_run_with_timeout() {
>     local timeout_in_seconds="$1"
>     local on_failure="$2"
>     local command_label="$3"
>     local command="${@:4}"
>     on_exit kill_test_watchdog
>    (
>            command_pid=$BASHPID
>            (sleep "${timeout_in_seconds}" # set a timeout for this command
>             echo "${command_label:-"The command '${command}'"} (pid: 
> $command_pid) did not finish after $timeout_in_seconds seconds."
> eval "${on_failure}"
>            kill "$command_pid") & watchdog_pid=$!
>            echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
>            # invoke
>           $command
>   )
> }{code}
>  
> When {{$command}} completes before the timeout, the watchdog process is 
> killed successfully. However, when {{$command}} times out, the watchdog 
> process kills {{$command}} and then exits itself, leaving behind an error 
> message when trying to kill its own process ID with {{{}kill 
> $watchdog_pid{}}}.This error msg "no such process" is hard to understand.
>  
> So, I will modify like this with better error message:
>  
> {code:java}
> kill_test_watchdog() {
>       local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>       if kill -0 $watchdog_pid > /dev/null 2>&1; then
>            echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>            kill $watchdog_pid
>       else
>             echo "[ERROR] Test is timeout"
>             exit 1       
>   fi
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)

2023-08-17 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755529#comment-17755529
 ] 

Hongshun Wang edited comment on FLINK-32668 at 8/17/23 12:35 PM:
-

I get it.Thanks a lot,  [~mapohl] .I will modify this as soon as possible.


was (Author: JIRAUSER298968):
Thanks a lot, I get it. I will modify this as soon as possible.

> fix up watchdog timeout error msg  in common.sh(e2e test) 
> --
>
> Key: FLINK-32668
> URL: https://issues.apache.org/jira/browse/FLINK-32668
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
> Attachments: image-2023-07-25-15-27-37-441.png
>
>
> When run e2e test, an error like this occrurs:
> !image-2023-07-25-15-27-37-441.png|width=733,height=115!
>  
> The corresponding code:
> {code:java}
> kill_test_watchdog() {
>     local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>     echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>     kill $watchdog_pid
> } 
> internal_run_with_timeout() {
>     local timeout_in_seconds="$1"
>     local on_failure="$2"
>     local command_label="$3"
>     local command="${@:4}"
>     on_exit kill_test_watchdog
>    (
>            command_pid=$BASHPID
>            (sleep "${timeout_in_seconds}" # set a timeout for this command
>             echo "${command_label:-"The command '${command}'"} (pid: 
> $command_pid) did not finish after $timeout_in_seconds seconds."
> eval "${on_failure}"
>            kill "$command_pid") & watchdog_pid=$!
>            echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid
>            # invoke
>           $command
>   )
> }{code}
>  
> When {{$command}} completes before the timeout, the watchdog process is 
> killed successfully. However, when {{$command}} times out, the watchdog 
> process kills {{$command}} and then exits itself, leaving behind an error 
> message when trying to kill its own process ID with {{{}kill 
> $watchdog_pid{}}}.This error msg "no such process" is hard to understand.
>  
> So, I will modify like this with better error message:
>  
> {code:java}
> kill_test_watchdog() {
>       local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid)
>       if kill -0 $watchdog_pid > /dev/null 2>&1; then
>            echo "Stopping job timeout watchdog (with pid=$watchdog_pid)"
>            kill $watchdog_pid
>       else
>             echo "[ERROR] Test is timeout"
>             exit 1       
>   fi
> } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS

2023-08-17 Thread sigalit (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755531#comment-17755531
 ] 

sigalit commented on FLINK-28171:
-

pull-request-available

> Adjust Job and Task manager port definitions to work with Istio+mTLS
> 
>
> Key: FLINK-28171
> URL: https://issues.apache.org/jira/browse/FLINK-28171
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.14.4
> Environment: flink-kubernetes-operator 1.0.0
> Flink 1.14-java11
> Kubernetes v1.19.5
> Istio 1.7.6
>Reporter: Moshe Elisha
>Assignee: Moshe Elisha
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Hello,
>  
> We are launching Flink deployments using the [Flink Kubernetes 
> Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/]
>  on a Kubernetes cluster with Istio and mTLS enabled.
>  
> We found that the TaskManager is unable to communicate with the JobManager on 
> the jobmanager-rpc port:
>  
> {{2022-06-15 15:25:40,508 WARN  akka.remote.ReliableDeliverySupervisor        
>                [] - Association with remote system 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]
>  has failed, address is now gated for [50] ms. Reason: [Association failed 
> with 
> [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]]
>  Caused by: [The remote system explicitly disassociated (reason unknown).]}}
>  
> The reason for the issue is that the JobManager service port definitions are 
> not following the Istio guidelines 
> [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/]
>  (see example below).
>  
> There was also an email discussion around this topic in the users mailing 
> group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - 
> port definitions".
> With the help of the community, we were able to work around the issue but it 
> was very hard and forced us to skip Istio proxy which is not ideal.
>  
> We would like you to consider changing the default port definitions, either
>  # Rename the ports – I understand it is Istio specific guideline but maybe 
> it is better to at least be aligned with one (popular) vendor guideline 
> instead of none at all.
>  # Add the “appProtocol” property[1] that is not specific to any vendor but 
> requires Kubernetes >= 1.19 where it was introduced as beta and moved to 
> stable in >= 1.20. The option to add appProtocol property was added only in 
> [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with 
> [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570].
>  # Or allow a way to override the defaults.
>  
> [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol]
>  
>  
> {{# k get service inference-results-to-analytics-engine -o yaml}}
> {{apiVersion: v1}}
> {{kind: Service}}
> {{...}}
> {{spec:}}
> {{  clusterIP: None}}
> {{  ports:}}
> {{  - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" 
> property*}}
> {{    port: 6123}}
> {{    protocol: TCP}}
> {{    targetPort: 6123}}
> {{  - name: blobserver *# should start with "tcp-" or add "appProtocol" 
> property*}}
> {{    port: 6124}}
> {{    protocol: TCP}}
> {{    targetPort: 6124}}
> {{...}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException

2023-08-17 Thread via GitHub


XComp commented on code in PR #23229:
URL: https://github.com/apache/flink/pull/23229#discussion_r1297168003


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadExtension.java:
##
@@ -119,6 +120,12 @@ public void before(ExtensionContext context) throws 
Exception {
 }
 file2 = TempDirUtils.newFile(tmpDirectory);
 Files.write(file2.toPath(), 
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+file3 = TempDirUtils.newFile(tmpDirectory);
+try (RandomAccessFile rw = new RandomAccessFile(file3, "rw")) {
+// magic value that reliably reproduced EndOfDataDecoderException 
in hasNext()
+rw.setLength(1410);

Review Comment:
   The test doesn't fail if I revert the fix. That makes me believe that this 
magic value doesn't reproduce the scenario reliably. How did you come up with 
the test scenario? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xiangyuf commented on pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…

2023-08-17 Thread via GitHub


xiangyuf commented on PR #23231:
URL: https://github.com/apache/flink/pull/23231#issuecomment-1682223131

   Hi @KarmaGYZ , all comments have been resolved, thx again for reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32821) Streaming examples failed to execute due to error in packaging

2023-08-17 Thread Alexander Fedulov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755534#comment-17755534
 ] 

Alexander Fedulov commented on FLINK-32821:
---

[~Zhanghao Chen] I am still not sure why you faced this issue. Bundling datagen 
was part of the original PR: 
[https://github.com/apache/flink/pull/23079/files#diff-c7da2a9d0258717dbd97328195d580aedc3fe51240aeb4fc0f292d946dcdb4af]

Maybe there was some unlucky timing because Leonard might have  merged the 
commits separately and maybe you pulled right before this one got integrated 
[https://github.com/apache/flink/commit/dfb9cb851dc1f0908ea6c3ce1230dd8ca2b48733]
Anyhow, I believe we can close this issue.

> Streaming examples failed to execute due to error in packaging
> --
>
> Key: FLINK-32821
> URL: https://issues.apache.org/jira/browse/FLINK-32821
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.18.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
>
> 7 out of the 8 streaming examples failed to run:
>  * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed 
> to run due to java.lang.NoClassDefFoundError: 
> org/apache/flink/streaming/examples/utils/ParameterTool
>  * MatrixVectorMul & TopSpeedWindowing & StateMachineExample failed to run 
> due to: Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.datagen.source.GeneratorFunction
> The NoClassDefFoundError with ParameterTool is introduced by FLINK-32558 
> Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better 
> resolve FLINK-32820 ParameterTool is mistakenly marked as deprecated - ASF 
> JIRA (apache.org) first before we come to a fix for this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException

2023-08-17 Thread via GitHub


XComp commented on code in PR #23229:
URL: https://github.com/apache/flink/pull/23229#discussion_r1297168003


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadExtension.java:
##
@@ -119,6 +120,12 @@ public void before(ExtensionContext context) throws 
Exception {
 }
 file2 = TempDirUtils.newFile(tmpDirectory);
 Files.write(file2.toPath(), 
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
+file3 = TempDirUtils.newFile(tmpDirectory);
+try (RandomAccessFile rw = new RandomAccessFile(file3, "rw")) {
+// magic value that reliably reproduced EndOfDataDecoderException 
in hasNext()
+rw.setLength(1410);

Review Comment:
   The test doesn't fail on my machine if I revert the fix. That makes me 
believe that this magic value doesn't reproduce the scenario reliably. How did 
you come up with the test scenario? :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between

2023-08-17 Thread Nicolas Fraison (Jira)
Nicolas Fraison created FLINK-32890:
---

 Summary: Flink app rolled back with old savepoints (3 hours back 
in time) while some checkpoints have been taken in between
 Key: FLINK-32890
 URL: https://issues.apache.org/jira/browse/FLINK-32890
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Nicolas Fraison


Here are all details about the issue (based on an app/scenario reproducing the 
issue): * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{Found 3 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 
1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at 
}}{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}}

 * Job failed because of the missing operator

Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
f298e8715b4d85e6f965b60e1c848cbe * {{Job 6b24a364c1905e924a69f3dbff0d26a6 has 
been registered for cleanup in the JobResultStore after reaching a terminal 
state.}}
 * {{Clean up the high availability data for job 
6b24a364c1905e924a69f3dbff0d26a6.}}
 * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}}

 * JobManager restart and try to resubmit the job but the job was already 
submitted so finished

 * {{Received JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Ignoring JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a 
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
execution.}}
 * {{Application completed SUCCESSFULLY}}

 * Finally the operator rollback the deployment and still indicate that {{Job 
is not running but HA metadata is available for last state restore, ready for 
upgrade}}
 * But the job metadata are not anymore there (clean previously)

(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
Path 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
 doesn't exist
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs

(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico
jobgraphs
jobs
leader * The rolled back app from flink operator finally take the last provided 
savepoint as no metadata/checkpoints are available. But this last savepoint is 
an old one as during the upgrade the operator decided to rely on last-state



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between

2023-08-17 Thread Nicolas Fraison (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Fraison updated FLINK-32890:

Description: 
Here are all details about the issue (based on an app/scenario reproducing the 
issue): * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{Found 3 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 
1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at 
}}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}}

 * Job failed because of the missing operator

{code:java}
Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has 
been registered for cleanup in the JobResultStore after reaching a terminal 
state.{code}
 * {{Clean up the high availability data for job 
6b24a364c1905e924a69f3dbff0d26a6.}}
 * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}}

 * JobManager restart and try to resubmit the job but the job was already 
submitted so finished

 * {{Received JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Ignoring JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a 
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
execution.}}
 * {{Application completed SUCCESSFULLY}}

 * Finally the operator rollback the deployment and still indicate that {{Job 
is not running but HA metadata is available for last state restore, ready for 
upgrade}}
 * But the job metadata are not anymore there (clean previously)

 
{code:java}
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
Path 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
 doesn't exist
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico
jobgraphs
jobs
leader
{code}
 

The rolled back app from flink operator finally take the last provided 
savepoint as no metadata/checkpoints are available. But this last savepoint is 
an old one as during the upgrade the operator decided to rely on last-state

  was:
Here are all details about the issue (based on an app/scenario reproducing the 
issue): * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).

[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between

2023-08-17 Thread Nicolas Fraison (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Fraison updated FLINK-32890:

Issue Type: Bug  (was: Improvement)

> Flink app rolled back with old savepoints (3 hours back in time) while some 
> checkpoints have been taken in between
> --
>
> Key: FLINK-32890
> URL: https://issues.apache.org/jira/browse/FLINK-32890
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Nicolas Fraison
>Priority: Major
>
> Here are all details about the issue (based on an app/scenario reproducing 
> the issue): * Deployed new release of a flink app with a new operator
>  * Flink Operator set the app as stable
>  * After some time the app failed and stay in failed state (due to some issue 
> with our kafka clusters)
>  * Sadly the team in charge of this flink app decided to rollback the app as 
> they were thinking it was linked to this new deployment
>  * Operator detect: {{Job is not running but HA metadata is available for 
> last state restore, ready for upgrade, Deleting JobManager deployment while 
> preserving HA metadata.}}  -> rely on last-state (as we do not disable 
> fallback), no savepoint taken
>  * Flink start JM and deployment of the app. It well find the 3 checkpoints
>  * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
> Zookeeper namespace.}}
>  * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
>  * {{Recovering checkpoints from 
> ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
>  * {{Found 3 checkpoints in 
> ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
>  * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 
> 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at 
> }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}}
>  * Job failed because of the missing operator
> {code:java}
> Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED.
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: There is no operator for the state 
> f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has 
> been registered for cleanup in the JobResultStore after reaching a terminal 
> state.{code}
>  * {{Clean up the high availability data for job 
> 6b24a364c1905e924a69f3dbff0d26a6.}}
>  * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from 
> ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}}
>  * JobManager restart and try to resubmit the job but the job was already 
> submitted so finished
>  * {{Received JobGraph submission 'flink-kafka-job' 
> (6b24a364c1905e924a69f3dbff0d26a6).}}
>  * {{Ignoring JobGraph submission 'flink-kafka-job' 
> (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a 
> globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
> execution.}}
>  * {{Application completed SUCCESSFULLY}}
>  * Finally the operator rollback the deployment and still indicate that {{Job 
> is not running but HA metadata is available for last state restore, ready for 
> upgrade}}
>  * But the job metadata are not anymore there (clean previously)
>  
> {code:java}
> (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
> /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
> Path 
> /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
>  doesn't exist
> (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
> /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs
> (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
> /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico
> jobgraphs
> jobs
> leader
> {code}
>  
> The rolled back app from flink operator finally take the last provided 
> savepoint as no metadata/checkpoints are available. But this last savepoint 
> is an old one as during the upgrade the operator decided to rely on last-state



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] 1996fanrui commented on a diff in pull request #23198: [FLINK-32856][JUnit5 Migration] Migrate the testutils, throughput and throwable packages of flink-runtime module to junit5

2023-08-17 Thread via GitHub


1996fanrui commented on code in PR #23198:
URL: https://github.com/apache/flink/pull/23198#discussion_r1297192247


##
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java:
##
@@ -316,18 +317,16 @@ public static void waitForMarkerFile(File file, long 
timeoutMillis)
 Thread.sleep(10);
 }
 
-if (!exists) {
-fail("The marker file was not found within " + timeoutMillis + " 
msecs");
-}
+assertThat(exists)
+.as("The marker file was not found within " + timeoutMillis + 
" msecs")

Review Comment:
   Thanks @X-czh 's review, updated!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager

2023-08-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32880:
---
Affects Version/s: 1.18.0

> Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
> ---
>
> Key: FLINK-32880
> URL: https://issues.apache.org/jira/browse/FLINK-32880
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, if you are using FineGrainedSlotManager, when a redundant 
> taskmanager exit abnormally, no extra taskmanager will be replenished during 
> the
> periodical check in FineGrainedSlotManager.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager

2023-08-17 Thread Yangze Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yangze Guo updated FLINK-32880:
---
Priority: Critical  (was: Major)

> Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
> ---
>
> Key: FLINK-32880
> URL: https://issues.apache.org/jira/browse/FLINK-32880
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.18.0
>Reporter: xiangyu feng
>Assignee: xiangyu feng
>Priority: Critical
>  Labels: pull-request-available
>
> Currently, if you are using FineGrainedSlotManager, when a redundant 
> taskmanager exit abnormally, no extra taskmanager will be replenished during 
> the
> periodical check in FineGrainedSlotManager.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32821) Streaming examples failed to execute due to error in packaging

2023-08-17 Thread Weihua Hu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755548#comment-17755548
 ] 

Weihua Hu commented on FLINK-32821:
---

Hi, [~afedulov] Thanks for you attention. 
the change in 
"[https://github.com/apache/flink/pull/23079/files#diff-c7da2a9d0258717dbd97328195d580aedc3fe51240aeb4fc0f292d946dcdb4af]";
 only bundle datagen connector to flink-example-streaming.jar, but it is not 
the final jar we used. You can check the final jar in 
target/TopSpeedWindowing.jar, there is no datagen related classes in it.

to reproduce this problem, we need a standalone cluster, and use './bin/flink 
run examples/streaming/TopSpeedWindowing.jar' to run this example

This issue could not reproduce by IDE, because IDE will add the dependency to 
classpathes automaticlly

> Streaming examples failed to execute due to error in packaging
> --
>
> Key: FLINK-32821
> URL: https://issues.apache.org/jira/browse/FLINK-32821
> Project: Flink
>  Issue Type: Bug
>  Components: Examples
>Affects Versions: 1.18.0
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>  Labels: pull-request-available
>
> 7 out of the 8 streaming examples failed to run:
>  * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed 
> to run due to java.lang.NoClassDefFoundError: 
> org/apache/flink/streaming/examples/utils/ParameterTool
>  * MatrixVectorMul & TopSpeedWindowing & StateMachineExample failed to run 
> due to: Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.connector.datagen.source.GeneratorFunction
> The NoClassDefFoundError with ParameterTool is introduced by FLINK-32558 
> Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better 
> resolve FLINK-32820 ParameterTool is mistakenly marked as deprecated - ASF 
> JIRA (apache.org) first before we come to a fix for this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between

2023-08-17 Thread Nicolas Fraison (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Fraison updated FLINK-32890:

Description: 
Here are all details about the issue:
 * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{Found 3 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 
1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at 
}}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}}

 * Job failed because of the missing operator

{code:java}
Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has 
been registered for cleanup in the JobResultStore after reaching a terminal 
state.{code}
 * {{Clean up the high availability data for job 
6b24a364c1905e924a69f3dbff0d26a6.}}
 * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}}

 * JobManager restart and try to resubmit the job but the job was already 
submitted so finished

 * {{Received JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Ignoring JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a 
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
execution.}}
 * {{Application completed SUCCESSFULLY}}

 * Finally the operator rollback the deployment and still indicate that {{Job 
is not running but HA metadata is available for last state restore, ready for 
upgrade}}
 * But the job metadata are not anymore there (clean previously)

 
{code:java}
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
Path 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
 doesn't exist
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico
jobgraphs
jobs
leader
{code}
 

The rolled back app from flink operator finally take the last provided 
savepoint as no metadata/checkpoints are available. But this last savepoint is 
an old one as during the upgrade the operator decided to rely on last-state

  was:
Here are all details about the issue (based on an app/scenario reproducing the 
issue): * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperSta

[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between

2023-08-17 Thread Nicolas Fraison (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Fraison updated FLINK-32890:

Description: 
Here are all details about the issue:
 * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Finally decided to rollback the new release just in case it could be the 
root cause of the issue on kafka
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{Found 3 checkpoints in 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}}
 * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 
1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at 
}}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}}

 * Job failed because of the missing operator

{code:java}
Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has 
been registered for cleanup in the JobResultStore after reaching a terminal 
state.{code}
 * {{Clean up the high availability data for job 
6b24a364c1905e924a69f3dbff0d26a6.}}
 * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from 
ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}}

 * JobManager restart and try to resubmit the job but the job was already 
submitted so finished

 * {{Received JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Ignoring JobGraph submission 'flink-kafka-job' 
(6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a 
globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous 
execution.}}
 * {{Application completed SUCCESSFULLY}}

 * Finally the operator rollback the deployment and still indicate that {{Job 
is not running but HA metadata is available for last state restore, ready for 
upgrade}}
 * But the job metadata are not anymore there (clean previously)

 
{code:java}
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
Path 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints
 doesn't exist
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs
(CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls 
/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico
jobgraphs
jobs
leader
{code}
 

The rolled back app from flink operator finally take the last provided 
savepoint as no metadata/checkpoints are available. But this last savepoint is 
an old one as during the upgrade the operator decided to rely on last-state 
(The old savepoint taken is a scheduled one)

  was:
Here are all details about the issue:
 * Deployed new release of a flink app with a new operator
 * Flink Operator set the app as stable
 * After some time the app failed and stay in failed state (due to some issue 
with our kafka clusters)
 * Sadly the team in charge of this flink app decided to rollback the app as 
they were thinking it was linked to this new deployment
 * Operator detect: {{Job is not running but HA metadata is available for last 
state restore, ready for upgrade, Deleting JobManager deployment while 
preserving HA metadata.}}  -> rely on last-state (as we do not disable 
fallback), no savepoint taken
 * Flink start JM and deployment of the app. It well find the 3 checkpoints

 * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as 
Zookeeper namespace.}}
 * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}}
 * {{Recovering checkpoints from 
ZooKeeperStateHandleStore\{namespace='f

[jira] [Updated] (FLINK-13698) Rework threading model of CheckpointCoordinator

2023-08-17 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-13698:
---
Priority: Not a Priority  (was: Minor)

> Rework threading model of CheckpointCoordinator
> ---
>
> Key: FLINK-13698
> URL: https://issues.apache.org/jira/browse/FLINK-13698
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.10.0
>Reporter: Piotr Nowojski
>Priority: Not a Priority
>  Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> Currently {{CheckpointCoordinator}} and {{CheckpointFailureManager}} code is 
> executed by multiple different threads (mostly {{ioExecutor}}, but not only). 
> It's causing multiple concurrency issues, for example: 
> https://issues.apache.org/jira/browse/FLINK-13497
> Proper fix would be to rethink threading model there. At first glance it 
> doesn't seem that this code should be multi threaded, except of parts doing 
> the actual IO operations, so it should be possible to run everything in one 
> single ExecutionGraph's thread and just run asynchronously necessary IO 
> operations with some feedback loop ("mailbox style").
> I would strongly recommend fixing this issue before adding new features in 
> the \{{CheckpointCoordinator}} component.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException

2023-08-17 Thread via GitHub


XComp commented on code in PR #23229:
URL: https://github.com/apache/flink/pull/23229#discussion_r1297229877


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java:
##
@@ -212,6 +212,16 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
 }
 }
 
+private static boolean hasNext(HttpPostRequestDecoder decoder) {
+try {
+return decoder.hasNext();
+} catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
+// this can occur if the final chuck wasn't empty, but didn't 
contain any attribute data
+// unfortunately the Netty APIs don't give us any way to check this

Review Comment:
   we could maintain the size of 
`currentHttpPostRequestDecoder.getBodyHttpDatas()` (that's essentially what the 
`hasNext()`/`next()` methods access and which increases with every `offer(..)` 
call): Only if the size of the returned List increases we now that there is 
more data to read. 
   
   That's the only workaround I could come up with based on what I found from 
reading through the `HttpPostMultipartRequestDecoder` iteration code and from 
what I found in https://github.com/netty/netty/issues/7869. Not sure whether 
that's a nicer approach, though. :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #248: [FLINK-32704] Supports spilling to disk when feedback channel memory buffer is full

2023-08-17 Thread via GitHub


jiangxin369 commented on code in PR #248:
URL: https://github.com/apache/flink-ml/pull/248#discussion_r1297235534


##
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java:
##
@@ -106,7 +106,7 @@ public int getLength() {
 @Override
 public void serialize(IterationRecord record, DataOutputView target) 
throws IOException {
 target.writeByte((byte) record.getType().ordinal());
-serializerNumber(record.getEpoch(), target);
+target.writeInt(record.getEpoch());

Review Comment:
   The original `serializerNumber` cannot serialize `Integer.MAX_VALUE + 1`, 
which is the epoch of the feedback record in the last iteration. In the 
previous implementation, the feedback records are just passed in memory and 
don't need to serialize/deserialize, so the bug doesn't appear.
   
   I also did a benchmark to serialize 1 billion integers with 
`serializerNumber` and `DataOutputSerializer#writeInt`, their performances are 
quite close, so I think we can remove the current `serializerNumber` 
implementation.



##
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator.feedback;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.disk.SpillingBuffer;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * * A queue that can spill the items to disks automatically when the memory 
buffer is full.
+ *
+ * @param  The element type.
+ */
+@Internal
+final class SpillableFeedbackQueue {
+private final DataOutputSerializer output = new DataOutputSerializer(256);
+private final TypeSerializer serializer;
+private final IOManager ioManager;
+private final MemoryManager memoryManager;
+private final int numPages;
+
+private List segments;
+private ListMemorySegmentPool segmentPool;
+
+private SpillingBuffer target;
+private long size = 0L;
+
+SpillableFeedbackQueue(
+IOManager ioManager,
+MemoryManager memoryManager,
+TypeSerializer serializer,
+long inMemoryBufferSize,
+long pageSize)
+throws MemoryAllocationException {
+this.serializer = Objects.requireNonNull(serializer);
+this.ioManager = Objects.requireNonNull(ioManager);
+this.memoryManager = Objects.requireNonNull(memoryManager);
+
+this.numPages = (int) (inMemoryBufferSize / pageSize);
+resetSpillingBuffer();
+}
+
+void add(T item) {
+try {
+output.clear();
+serializer.serialize(item, output);
+target.write(output.getSharedBuffer(), 0, output.length());
+size++;
+} catch (IOException e) {
+throw new IllegalStateException(e);
+}
+}
+
+MutableObjectIterator iterate() {
+try {
+DataInputView input = target.flip();
+return new InputViewIterator<>(input, this.serializer);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+long size() {
+return size;
+}
+
+void reset() throws Exception {
+size = 0;
+close();
+resetSpillingBuffer();
+}
+
+void close() throws IOException {
+output.clear();
+List toRelease = target.close();
+toRelease.addAll(seg

  1   2   3   >