[jira] [Updated] (FLINK-32881) Client supports making savepoints in detach mode
[ https://issues.apache.org/jira/browse/FLINK-32881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu updated FLINK-32881: - Component/s: Runtime / Checkpointing (was: API / State Processor) > Client supports making savepoints in detach mode > > > Key: FLINK-32881 > URL: https://issues.apache.org/jira/browse/FLINK-32881 > Project: Flink > Issue Type: Improvement > Components: Client / Job Submission, Runtime / Checkpointing >Affects Versions: 1.19.0 >Reporter: Renxiang Zhou >Priority: Major > Labels: detach-savepoint > Fix For: 1.19.0 > > Attachments: image-2023-08-16-17-14-34-740.png, > image-2023-08-16-17-14-44-212.png > > > When triggering a savepoint using the command-line tool, the client needs to > wait for the job to finish creating the savepoint before it can exit. For > jobs with large state, the savepoint creation process can be time-consuming, > leading to the following problems: > # Platform users may need to manage thousands of Flink tasks on a single > client machine. With the current savepoint triggering mode, all savepoint > creation threads on that machine have to wait for the job to finish the > snapshot, resulting in significant resource waste; > # If the savepoint producing time exceeds the client's timeout duration, the > client will throw a timeout exception and report that the triggering > savepoint process fails. Since different jobs have varying savepoint > durations, it is difficult to adjust the timeout parameter on the client side. > Therefore, we propose adding a detach mode to trigger savepoints on the > client side, just similar to the detach mode behavior when submitting jobs. > Here are some specific details: > # The savepoint UUID will be generated on the client side. After > successfully triggering the savepoint, the client immediately returns the > UUID information and exits. > # Add a "dump-pending-savepoints" API that allows the client to check > whether the triggered savepoint has been successfully created. > By implementing these changes, the client can detach from the savepoint > creation process, reducing resource waste, and providing a way to check the > status of savepoint creation. > !image-2023-08-16-17-14-34-740.png|width=2129,height=625!!image-2023-08-16-17-14-44-212.png|width=1530,height=445! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32881) Client supports making savepoints in detach mode
[ https://issues.apache.org/jira/browse/FLINK-32881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755391#comment-17755391 ] Hangxiang Yu commented on FLINK-32881: -- Hi, [~zhourenxiang] We often use asyc thread to trigger savepoint by REST API to resolve this. Of course I think It's still useful for users to trigger savepoint by CLI. Would you like to contribute your pr ? > Client supports making savepoints in detach mode > > > Key: FLINK-32881 > URL: https://issues.apache.org/jira/browse/FLINK-32881 > Project: Flink > Issue Type: Improvement > Components: API / State Processor, Client / Job Submission >Affects Versions: 1.19.0 >Reporter: Renxiang Zhou >Priority: Major > Labels: detach-savepoint > Fix For: 1.19.0 > > Attachments: image-2023-08-16-17-14-34-740.png, > image-2023-08-16-17-14-44-212.png > > > When triggering a savepoint using the command-line tool, the client needs to > wait for the job to finish creating the savepoint before it can exit. For > jobs with large state, the savepoint creation process can be time-consuming, > leading to the following problems: > # Platform users may need to manage thousands of Flink tasks on a single > client machine. With the current savepoint triggering mode, all savepoint > creation threads on that machine have to wait for the job to finish the > snapshot, resulting in significant resource waste; > # If the savepoint producing time exceeds the client's timeout duration, the > client will throw a timeout exception and report that the triggering > savepoint process fails. Since different jobs have varying savepoint > durations, it is difficult to adjust the timeout parameter on the client side. > Therefore, we propose adding a detach mode to trigger savepoints on the > client side, just similar to the detach mode behavior when submitting jobs. > Here are some specific details: > # The savepoint UUID will be generated on the client side. After > successfully triggering the savepoint, the client immediately returns the > UUID information and exits. > # Add a "dump-pending-savepoints" API that allows the client to check > whether the triggered savepoint has been successfully created. > By implementing these changes, the client can detach from the savepoint > creation process, reducing resource waste, and providing a way to check the > status of savepoint creation. > !image-2023-08-16-17-14-34-740.png|width=2129,height=625!!image-2023-08-16-17-14-44-212.png|width=1530,height=445! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] 1996fanrui commented on a diff in pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement
1996fanrui commented on code in PR #652: URL: https://github.com/apache/flink-kubernetes-operator/pull/652#discussion_r1296769538 ## docs/content/docs/custom-resource/autoscaler.md: ## @@ -35,26 +35,78 @@ Key benefits to the user: - Automatic adaptation to changing load patterns - Detailed utilization metrics for performance debugging -Job requirements: - - The autoscaler currently only works with the latest [Flink 1.17](https://hub.docker.com/_/flink) or after backporting the following fixes to your 1.15/1.16 Flink image - - Job vertex parallelism overrides (must have) - - [Add option to override job vertex parallelisms during job submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) - - [Change ForwardPartitioner to RebalancePartitioner on parallelism changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits) - - [Fix logic for determining downstream subtasks for partitioner replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3) - - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) - - Source scaling only supports modern sources which - - use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) that exposes the busy time metric (must have, most common connectors already do) - - expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (good to have, extra capacity will be added for catching up with backlog) +## Overview -In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. +The autoscaler relies on the metrics exposed by the Flink metric system for the individual tasks. The metrics are queried directly from the Flink job. -{{< hint info >}} -The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. -This can be used to gain confidence in the module without any impact on the running applications. +Collected metrics: + - Backlog information at each source + - Incoming data rate at the sources (e.g. records/sec written into the Kafka topic) + - Number of records processed per second in each job vertex + - Busy time per second of each job vertex (current utilization) -To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< hint info >}} +Please note that we are not using any container memory / CPU utilization metrics directly here. High utilization will be reflected in the processing rate and busy time metrics of the individual job vertexes. {{< /hint >}} +The algorithm starts from the sources and recursively computes the required processing capacity (target data rate) for each operator in the pipeline. At the source vertices, target data rate is equal to incoming data rate (from the Kafka topic). + +For downstream operators we compute the target data rate as the sum of the input (upstream) operators output data rate along the given edge in the processing graph. + +{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target Data Rates" >}} + +Users configure the target utilization percentage of the operators in the pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler then finds a parallelism configuration such that the output rates of all operators match the input rates of all their downstream operators at the targeted utilization. + +In this example we see an upscale operation: + +{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}} + +Similarly as load decreases, the autoscaler adjusts individual operator parallelism levels to match the current rate over time. + +{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}} + +The autoscaler approach is based on [Three steps is all you need: fast, accurate, automatic scaling decisions for distributed streaming dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri et al. + +## Executing rescaling operations + +By default the autoscaler uses the built in job upgrade mechanism from the operator to perform the rescaling as detailed in [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}}). + +### Flink 1.18 and in-place scaling support + +The upcoming Flink 1.18 release brings very significant improvements to the speed of scaling operations through the new resource requirements rest endpoint. +This allows the autos
[jira] [Assigned] (FLINK-32886) Issue with volumeMounts when creating OLM for Flink Operator 1.6.0
[ https://issues.apache.org/jira/browse/FLINK-32886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora reassigned FLINK-32886: -- Assignee: James Busche > Issue with volumeMounts when creating OLM for Flink Operator 1.6.0 > -- > > Key: FLINK-32886 > URL: https://issues.apache.org/jira/browse/FLINK-32886 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: James Busche >Assignee: James Busche >Priority: Minor > > I notice a volumemount problem when trying to deploy the OLM CSV for the > 1.6.0 Flink Kubernetes Operator. (Following the directions from [OLM > Verification of a Flink Kubernetes Operator > Release|https://cwiki.apache.org/confluence/display/FLINK/OLM+Verification+of+a+Flink+Kubernetes+Operator+Release]] > > ^{{oc describe csv}}^ > ^{{...}}^ > ^{{Warning InstallComponentFailed 46s (x7 over 49s) > operator-lifecycle-manager install strategy failed: Deployment.apps > "flink-kubernetes-operator" is invalid: [spec.template.spec.volumes[2].name: > Duplicate value: "keystore", > spec.template.spec.containers[0].volumeMounts[1].name: Not found: > "flink-artifacts-volume"]}}^ > > My current workaround is to change [line > 88|https://github.com/apache/flink-kubernetes-operator/blob/main/tools/olm/docker-entry.sh#L88] > to look like this: > > {{ ^yq ea -i > '.spec.install.spec.deployments[0].spec.template.spec.volumes[1] = \{"name": > "flink-artifacts-volume","emptyDir": {}}' "${CSV_FILE}"^}} ^{{yq ea -i > '.spec.install.spec.deployments[0].spec.template.spec.volumes[2] = \{"name": > "keystore","emptyDir": {}}' "${CSV_FILE}"}}^ > > And then the operator deploys without error: > ^oc get csv > NAME > DISPLAY VERSION REPLACES > PHASEflink-kubernetes-operator.v1.6.0 Flink > Kubernetes Operator 1.6.0 flink-kubernetes-operator.v1.5.0 Succeeded^ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement
gyfora commented on code in PR #652: URL: https://github.com/apache/flink-kubernetes-operator/pull/652#discussion_r1296783335 ## docs/content/docs/custom-resource/autoscaler.md: ## @@ -35,26 +35,78 @@ Key benefits to the user: - Automatic adaptation to changing load patterns - Detailed utilization metrics for performance debugging -Job requirements: - - The autoscaler currently only works with the latest [Flink 1.17](https://hub.docker.com/_/flink) or after backporting the following fixes to your 1.15/1.16 Flink image - - Job vertex parallelism overrides (must have) - - [Add option to override job vertex parallelisms during job submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) - - [Change ForwardPartitioner to RebalancePartitioner on parallelism changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits) - - [Fix logic for determining downstream subtasks for partitioner replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3) - - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) - - Source scaling only supports modern sources which - - use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) that exposes the busy time metric (must have, most common connectors already do) - - expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (good to have, extra capacity will be added for catching up with backlog) +## Overview -In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. +The autoscaler relies on the metrics exposed by the Flink metric system for the individual tasks. The metrics are queried directly from the Flink job. -{{< hint info >}} -The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. -This can be used to gain confidence in the module without any impact on the running applications. +Collected metrics: + - Backlog information at each source + - Incoming data rate at the sources (e.g. records/sec written into the Kafka topic) + - Number of records processed per second in each job vertex + - Busy time per second of each job vertex (current utilization) -To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< hint info >}} +Please note that we are not using any container memory / CPU utilization metrics directly here. High utilization will be reflected in the processing rate and busy time metrics of the individual job vertexes. {{< /hint >}} +The algorithm starts from the sources and recursively computes the required processing capacity (target data rate) for each operator in the pipeline. At the source vertices, target data rate is equal to incoming data rate (from the Kafka topic). + +For downstream operators we compute the target data rate as the sum of the input (upstream) operators output data rate along the given edge in the processing graph. + +{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target Data Rates" >}} + +Users configure the target utilization percentage of the operators in the pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler then finds a parallelism configuration such that the output rates of all operators match the input rates of all their downstream operators at the targeted utilization. + +In this example we see an upscale operation: + +{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}} + +Similarly as load decreases, the autoscaler adjusts individual operator parallelism levels to match the current rate over time. + +{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}} + +The autoscaler approach is based on [Three steps is all you need: fast, accurate, automatic scaling decisions for distributed streaming dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri et al. + +## Executing rescaling operations + +By default the autoscaler uses the built in job upgrade mechanism from the operator to perform the rescaling as detailed in [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}}). + +### Flink 1.18 and in-place scaling support + +The upcoming Flink 1.18 release brings very significant improvements to the speed of scaling operations through the new resource requirements rest endpoint. +This allows the autoscale
[GitHub] [flink-ml] Fanoid commented on a diff in pull request #250: [FLINK-32810] Improve memory allocation in ListStateWithCache
Fanoid commented on code in PR #250: URL: https://github.com/apache/flink-ml/pull/250#discussion_r1296795610 ## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/datacache/nonkeyed/ListStateWithCache.java: ## @@ -62,14 +63,32 @@ public class ListStateWithCache implements ListState { /** The data cache writer for the received records. */ private final DataCacheWriter dataCacheWriter; -@SuppressWarnings("unchecked") public ListStateWithCache( TypeSerializer serializer, StreamTask containingTask, StreamingRuntimeContext runtimeContext, StateInitializationContext stateInitializationContext, OperatorID operatorID) throws IOException { +this( +serializer, +containingTask, +runtimeContext, +stateInitializationContext, +operatorID, +0.); Review Comment: @jiangxin369 and @zhipeng93 , thanks for your comments. After offline discussion, I found it is necessary to do check about sub-fractions from all usages on the operator-scope managed memory. So I changed the constructor of `ListStateWithCache`. Please check the latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] Fanoid commented on pull request #250: [FLINK-32810] Improve memory allocation in ListStateWithCache
Fanoid commented on PR #250: URL: https://github.com/apache/flink-ml/pull/250#issuecomment-1681787714 hi, @jiangxin369 and @zhipeng93, I've updated the PR. Could you have another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on pull request #23216: [FLINK-32831][table-planner] RuntimeFilterProgram should aware join type when looking for the build side
wanglijie95 commented on PR #23216: URL: https://github.com/apache/flink/pull/23216#issuecomment-1681809261 Thanks for review @lsyldliu , I 've addressed or replied your comments, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wanglijie95 commented on a diff in pull request #23216: [FLINK-32831][table-planner] RuntimeFilterProgram should aware join type when looking for the build side
wanglijie95 commented on code in PR #23216: URL: https://github.com/apache/flink/pull/23216#discussion_r1296819120 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java: ## @@ -202,6 +202,32 @@ public void testBuildSideIsJoinWithoutExchange() throws Exception { util.verifyPlan(query); } +@Test +public void testBuildSideIsLeftJoinWithoutExchange() throws Exception { Review Comment: After an offline discussion with @lsyldliu , we think it is meaningful to directly inject runtime filter for fact2(let the output of the `LeftOuterJoin` as a builder), even if it will affect the creation of `MultipleInput`. We will keep it as a future optimization, no changes will be made in this version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-30234) SourceReaderBase should provide an option to disable numRecordsIn metric registration
[ https://issues.apache.org/jira/browse/FLINK-30234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qingsheng Ren reassigned FLINK-30234: - Assignee: Hongshun Wang (was: Wencong Liu) > SourceReaderBase should provide an option to disable numRecordsIn metric > registration > - > > Key: FLINK-30234 > URL: https://issues.apache.org/jira/browse/FLINK-30234 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common >Affects Versions: 1.17.0 >Reporter: Qingsheng Ren >Assignee: Hongshun Wang >Priority: Major > > Currently the numRecordsIn metric is pre-registered for all sources in > SourceReaderBase. Considering different implementation of source reader, the > definition of "record" might differ from the one we use in SourceReaderBase, > hence numRecordsIn might be inaccurate. > We could introduce an option in SourceReader to disable the registration of > numRecordsIn in SourceReaderBase and let the actual implementation to report > the metric instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #652: [docs][autoscaler] Autoscaler docs and default config improvement
gyfora merged PR #652: URL: https://github.com/apache/flink-kubernetes-operator/pull/652 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
XComp commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1296771740 ## tools/ci/verify_scala_suffixes.sh: ## @@ -37,20 +37,33 @@ # # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala # dependent modules. -CI_DIR=$1 -FLINK_ROOT=$2 + + +usage() { + echo "Usage: $0 " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ScalaSuffixChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +MVN=${MVN:-./mvnw} echo "--- Flink Scala Dependency Analyzer ---" echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'." echo "If you haven't built the project, please do so first by running \"mvn clean install -DskipTests\"" -source "${CI_DIR}/maven-utils.sh" +dependency_plugin_output=/tmp/dep.txt -cd "$FLINK_ROOT" || exit - -dependency_plugin_output=${CI_DIR}/dep.txt - -run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} >> "${dependency_plugin_output}" +$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > "${dependency_plugin_output}" Review Comment: ```suggestion $MVN -T1 dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > "${dependency_plugin_output}" ``` ## tools/ci/verify_scala_suffixes.sh: ## @@ -37,20 +37,33 @@ # # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala # dependent modules. -CI_DIR=$1 -FLINK_ROOT=$2 + + +usage() { + echo "Usage: $0 " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ScalaSuffixChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +MVN=${MVN:-./mvnw} echo "--- Flink Scala Dependency Analyzer ---" echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'." echo "If you haven't built the project, please do so first by running \"mvn clean install -DskipTests\"" -source "${CI_DIR}/maven-utils.sh" +dependency_plugin_output=/tmp/dep.txt Review Comment: ```suggestion dependency_plugin_output=/tmp/dependency-tree-including-scala.txt ``` nit: We might want to give it a more meaningful name to differentiate it from the other dependency:tree temporary file. ...even if it's only a temporary file. That makes things easier to debug issues. ## tools/ci/verify_scala_suffixes.sh: ## @@ -37,20 +37,33 @@ # # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala # dependent modules. -CI_DIR=$1 -FLINK_ROOT=$2 + + +usage() { + echo "Usage: $0 " Review Comment: ```suggestion echo "Usage: $0" ``` nit: just to avoid someone coming up with a hotfix PR ## tools/ci/verify_bundled_optional.sh: ## @@ -17,24 +17,51 @@ # limitations under the License. # +usage() { + echo "Usage: $0 " + echo " A file containing the output of the Maven build." + echo "" + echo "mvnw clean package > " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ShadeOptionalChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +if [[ "$#" != "1" ]]; then + usage + exit 1 +fi + ## Checks that all bundled dependencies are marked as optional in the poms MVN_CLEAN_COMPILE_OUT=$1 -CI_DIR=$2 -FLINK_ROOT=$3 -source "${CI_DIR}/maven-utils.sh" +MVN=${MVN:-./mvnw} -cd "$FLINK_ROOT" || exit +dependency_plugin_output=/tmp/optional_dep.txt Review Comment: ```suggestion dependency_plugin_output=/tmp/dependency-tree.txt ``` nit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
XComp commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1296771740 ## tools/ci/verify_scala_suffixes.sh: ## @@ -37,20 +37,33 @@ # # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala # dependent modules. -CI_DIR=$1 -FLINK_ROOT=$2 + + +usage() { + echo "Usage: $0 " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ScalaSuffixChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +MVN=${MVN:-./mvnw} echo "--- Flink Scala Dependency Analyzer ---" echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'." echo "If you haven't built the project, please do so first by running \"mvn clean install -DskipTests\"" -source "${CI_DIR}/maven-utils.sh" +dependency_plugin_output=/tmp/dep.txt -cd "$FLINK_ROOT" || exit - -dependency_plugin_output=${CI_DIR}/dep.txt - -run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} >> "${dependency_plugin_output}" +$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > "${dependency_plugin_output}" Review Comment: ```suggestion $MVN -T1 dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} > "${dependency_plugin_output}" ``` Here we have to add `-T1` as well to make sure that the output is actually parseable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
XComp commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1296832114 ## tools/ci/verify_bundled_optional.sh: ## @@ -17,24 +17,51 @@ # limitations under the License. # +usage() { + echo "Usage: $0 " + echo " A file containing the output of the Maven build." + echo "" + echo "mvnw clean package > " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ShadeOptionalChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +if [[ "$#" != "1" ]]; then + usage + exit 1 +fi + ## Checks that all bundled dependencies are marked as optional in the poms MVN_CLEAN_COMPILE_OUT=$1 -CI_DIR=$2 -FLINK_ROOT=$3 -source "${CI_DIR}/maven-utils.sh" +MVN=${MVN:-./mvnw} -cd "$FLINK_ROOT" || exit +dependency_plugin_output=/tmp/optional_dep.txt -dependency_plugin_output=${CI_DIR}/optional_dep.txt +$MVN dependency:tree -B > "${dependency_plugin_output}" Review Comment: I did another pass over the code and identified one other location where we should add the `-T1` parameter. I commented on it below in a separate review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #653: Backport recent autoscaler docs changes
gyfora commented on PR #653: URL: https://github.com/apache/flink-kubernetes-operator/pull/653#issuecomment-1681842989 cc @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #653: Backport recent autoscaler docs changes
gyfora opened a new pull request, #653: URL: https://github.com/apache/flink-kubernetes-operator/pull/653 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui opened a new pull request, #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
1996fanrui opened a new pull request, #23228: URL: https://github.com/apache/flink/pull/23228 [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore The `private boolean put(File fromFile, String toBlobPath)` has called `createBasePathIfNeeded`, so the `public boolean put(File localFile, JobID jobId, BlobKey blobKey)` doesn't need to call `createBasePathIfNeeded`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
1996fanrui commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681845331 Hi @gaborgsomogyi , would you mind helping take a look this hotfix? thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)
[ https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755421#comment-17755421 ] Matthias Pohl commented on FLINK-32668: --- If the test times out, the failure handling should be triggered (see [common.sh:957|https://github.com/apache/flink/blob/c8ae39d4ac73f81873e1d8ac37e17c29ae330b23/flink-end-to-end-tests/test-scripts/common.sh#L957]) to print a meaningful output. Nonetheless, the following kill command will cause the actual test run to return with a non-zero exit code. As a consequence, the script will fail. So, your scenario should be already covered by the existing code. The {{kill_test_watchdog}} covers a separate scenario: Killing the watchdog process if it's still around. Here, we shouldn't cover the corresponding test run anymore (and imply whether the test succeeded or not based on whether the watchdog process is still around). The error handling, as said in the previous paragraph, should be covered in the test execution itself. Please correct me if I'm missing anything. > fix up watchdog timeout error msg in common.sh(e2e test) > -- > > Key: FLINK-32668 > URL: https://issues.apache.org/jira/browse/FLINK-32668 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Attachments: image-2023-07-25-15-27-37-441.png > > > When run e2e test, an error like this occrurs: > !image-2023-07-25-15-27-37-441.png|width=733,height=115! > > The corresponding code: > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > } > internal_run_with_timeout() { > local timeout_in_seconds="$1" > local on_failure="$2" > local command_label="$3" > local command="${@:4}" > on_exit kill_test_watchdog > ( > command_pid=$BASHPID > (sleep "${timeout_in_seconds}" # set a timeout for this command > echo "${command_label:-"The command '${command}'"} (pid: > $command_pid) did not finish after $timeout_in_seconds seconds." > eval "${on_failure}" > kill "$command_pid") & watchdog_pid=$! > echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid > # invoke > $command > ) > }{code} > > When {{$command}} completes before the timeout, the watchdog process is > killed successfully. However, when {{$command}} times out, the watchdog > process kills {{$command}} and then exits itself, leaving behind an error > message when trying to kill its own process ID with {{{}kill > $watchdog_pid{}}}.This error msg "no such process" is hard to understand. > > So, I will modify like this with better error message: > > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > if kill -0 $watchdog_pid > /dev/null 2>&1; then > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > else > echo "[ERROR] Test is timeout" > exit 1 > fi > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
flinkbot commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681852112 ## CI report: * 4724d64fec3af7b4e0b1a5b634fc2f0d19f5f3a0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32875) Enable limiting number of retained jobs for the local job archive directory in flink history server
[ https://issues.apache.org/jira/browse/FLINK-32875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-32875: -- Fix Version/s: (was: 1.18.0) > Enable limiting number of retained jobs for the local job archive directory > in flink history server > --- > > Key: FLINK-32875 > URL: https://issues.apache.org/jira/browse/FLINK-32875 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.16.1 >Reporter: Allison Chang >Priority: Not a Priority > > Currently the Flink history server limits the number of job archives stored > using the history.server.retained-jobs configuration, which sets the limit > for both the local cache as well as remote directories. We want to decouple > the local cache limit from the remote directory which stores the job archives > so that we are not limited by the size of a local filesystem where Flink > History server is deployed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32888) File upload runs into EndOfDataDecoderException
Chesnay Schepler created FLINK-32888: Summary: File upload runs into EndOfDataDecoderException Key: FLINK-32888 URL: https://issues.apache.org/jira/browse/FLINK-32888 Project: Flink Issue Type: Bug Components: Runtime / REST Affects Versions: 1.17.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Fix For: 1.18.0, 1.17.2 With the right request the FIleUploadHandler runs into a EndOfDataDecoderException although everything is fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32889) BinaryClassificationEvaluator gives wrong weighted AUC value
Fan Hong created FLINK-32889: Summary: BinaryClassificationEvaluator gives wrong weighted AUC value Key: FLINK-32889 URL: https://issues.apache.org/jira/browse/FLINK-32889 Project: Flink Issue Type: Bug Components: Library / Machine Learning Affects Versions: ml-2.3.0 Reporter: Fan Hong BinaryClassificationEvaluator gives wrong AUC value when a weight column provided. Here is an case from the unit test. The (score, label, weight) of data are: {code:java} (0.9, 1.0, 0.8), (0.9, 1.0, 0.7), (0.9, 1.0, 0.5), (0.75, 0.0, 1.2), (0.6, 0.0, 1.3), (0.9, 1.0, 1.5), (0.9, 1.0, 1.4), (0.4, 0.0, 0.3), (0.3, 0.0, 0.5), (0.9, 1.0, 1.9), (0.2, 0.0, 1.2), (0.1, 1.0, 1.0) {code} PySpark and scikit-learn gives a AUC score of 0.87179, while Flink ML implementation gives 0.891168. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #653: Backport recent autoscaler docs changes
gyfora merged PR #653: URL: https://github.com/apache/flink-kubernetes-operator/pull/653 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #22341: [FLINK-27204][flink-runtime] Refract FileSystemJobResultStore to execute I/O operations on the ioExecutor
XComp commented on code in PR #22341: URL: https://github.com/apache/flink/pull/22341#discussion_r1296902228 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java: ## @@ -376,7 +376,7 @@ private void confirmLeadership( LOG.debug("Confirm leadership {}.", leaderSessionId); leaderElection.confirmLeadership(leaderSessionId, address); } else { -LOG.trace( +LOG.debug( "Ignore confirming leadership because the leader {} is no longer valid.", leaderSessionId); } Review Comment: the entire callback can be replaced by `runIfValidLeader` ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java: ## @@ -262,34 +262,67 @@ public void grantLeadership(UUID leaderSessionID) { @GuardedBy("lock") private void startJobMasterServiceProcessAsync(UUID leaderSessionId) { sequentialOperation = -sequentialOperation.thenRun( -() -> -runIfValidLeader( -leaderSessionId, -ThrowingRunnable.unchecked( +sequentialOperation.thenCompose( +unused -> +supplyAsyncIfValidLeader( +leaderSessionId, () -> - verifyJobSchedulingStatusAndCreateJobMasterServiceProcess( - leaderSessionId)), -"verify job scheduling status and create JobMasterServiceProcess")); - + jobResultStore.hasJobResultEntryAsync( +getJobID()), +() -> + FutureUtils.completedExceptionally( +new LeadershipLostException( +"The leadership is lost."))) +.handle( +(hasJobResult, throwable) -> { +if (throwable +instanceof LeadershipLostException) { + printLogIfNotValidLeader( +"verify job result entry", + leaderSessionId); +return null; +} else if (throwable != null) { + ExceptionUtils.rethrow(throwable); +} +if (hasJobResult) { + handleJobAlreadyDoneIfValidLeader( + leaderSessionId); +} else { + createNewJobMasterServiceProcessIfValidLeader( + leaderSessionId); +} +return null; +})); handleAsyncOperationError(sequentialOperation, "Could not start the job manager."); } -@GuardedBy("lock") -private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId) -throws FlinkException { -try { -if (jobResultStore.hasJobResultEntry(getJobID())) { -jobAlreadyDone(leaderSessionId); -} else { -createNewJobMasterServiceProcess(leaderSessionId); -} -} catch (IOException e) { -throw new FlinkException( -String.format( -"Could not retrieve the job scheduling status for job %s.", getJobID()), -e); -} +private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { +runIfValidLeader( +
[GitHub] [flink] zentol opened a new pull request, #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
zentol opened a new pull request, #23229: URL: https://github.com/apache/flink/pull/23229 HttpPostRequestDecoder#hasNext() throws an `EndOfDataDecoderException` when called if all attributes were already read. There is apparently an edge-case where the a non empty HttpContent is received, but we already retrieved all attributes (so I guess it's just multipart metadata). We now assume that this isn't a problem; if it is the application layer should notice it (e.g., if a file/attribute is missing). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32888) File upload runs into EndOfDataDecoderException
[ https://issues.apache.org/jira/browse/FLINK-32888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32888: --- Labels: pull-request-available (was: ) > File upload runs into EndOfDataDecoderException > --- > > Key: FLINK-32888 > URL: https://issues.apache.org/jira/browse/FLINK-32888 > Project: Flink > Issue Type: Bug > Components: Runtime / REST >Affects Versions: 1.17.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.17.2 > > > With the right request the FIleUploadHandler runs into a > EndOfDataDecoderException although everything is fine. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
gaborgsomogyi commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681926573 If you can elaborate what this can cause I can understand more of your issue. Just for giving some context, there are many usages of the class and all of the execution paths must create the directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
flinkbot commented on PR #23229: URL: https://github.com/apache/flink/pull/23229#issuecomment-1681935539 ## CI report: * f962bdf1e05e41440f8996dcd0b6511104175371 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
gaborgsomogyi commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681939446 Now I see what may you mean. The `createBasePathIfNeeded` function called 2 times in the call chain so we're loosing 2-3 processor ticks because the second call must double check a boolean flag. If I understand correctly then this is more like a cosmetic change and not a hotfix just to double check, right? Correct me if I've understood something wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] masteryhx commented on pull request #23123: [FLINK-32523] Fix Timeout and Assert Error for NotifyCheckpointAbortedITCase#testNotifyCheckpointAborted
masteryhx commented on PR #23123: URL: https://github.com/apache/flink/pull/23123#issuecomment-1681940016 @Myasuka @fredia Could you help to take a look ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32523) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout on AZP
[ https://issues.apache.org/jira/browse/FLINK-32523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32523: --- Labels: pull-request-available test-stability (was: test-stability) > NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails with timeout > on AZP > --- > > Key: FLINK-32523 > URL: https://issues.apache.org/jira/browse/FLINK-32523 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.16.2, 1.18.0 >Reporter: Sergey Nuyanzin >Assignee: Hangxiang Yu >Priority: Critical > Labels: pull-request-available, test-stability > Attachments: failure.log > > > This build > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=50795&view=logs&j=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3&t=0c010d0c-3dec-5bf1-d408-7b18988b1b2b&l=8638 > fails with timeout > {noformat} > Jul 03 01:26:35 org.junit.runners.model.TestTimedOutException: test timed out > after 10 milliseconds > Jul 03 01:26:35 at java.lang.Object.wait(Native Method) > Jul 03 01:26:35 at java.lang.Object.wait(Object.java:502) > Jul 03 01:26:35 at > org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61) > Jul 03 01:26:35 at > org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:198) > Jul 03 01:26:35 at > org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:189) > Jul 03 01:26:35 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Jul 03 01:26:35 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Jul 03 01:26:35 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Jul 03 01:26:35 at java.lang.reflect.Method.invoke(Method.java:498) > Jul 03 01:26:35 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > Jul 03 01:26:35 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Jul 03 01:26:35 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > Jul 03 01:26:35 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Jul 03 01:26:35 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > Jul 03 01:26:35 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > Jul 03 01:26:35 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Jul 03 01:26:35 at java.lang.Thread.run(Thread.java:748) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
zentol commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1296932433 ## tools/ci/license_check.sh: ## @@ -17,6 +17,34 @@ # limitations under the License. +usage() { + if [[ "$#" != "2" ]]; then +echo "Usage: $0 " +echo " A file containing the output of the Maven build." +echo " A directory containing a Maven repository into which the Flink artifacts were deployed." +echo "" +echo "Example preparation:" +echo "mvnw clean deploy -DaltDeploymentRepository=validation_repository::default::file: > " +echo "" +echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." +echo "See further details in the JavaDoc of LicenseChecker." + fi +} + +while getopts 'h' o; do + case "${o}" in +h) + usage Review Comment: doesnt work when used with brackets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
gaborgsomogyi commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681945758 The change itself looks good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-23636) NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails on azure
[ https://issues.apache.org/jira/browse/FLINK-23636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hangxiang Yu closed FLINK-23636. Fix Version/s: (was: 1.13.7) Resolution: Duplicate It's duplicated by FLINK-32523 All cases and reasons have been listed in the new ticket, so I closed this. > NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted fails on azure > > > Key: FLINK-23636 > URL: https://issues.apache.org/jira/browse/FLINK-23636 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.13.2 >Reporter: Xintong Song >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=21558&view=logs&j=59c257d0-c525-593b-261d-e96a86f1926b&t=b93980e3-753f-5433-6a19-13747adae66a&l=9527 > {code} > Aug 05 03:03:53 [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, > Time elapsed: 6.985 s <<< FAILURE! - in > org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase > Aug 05 03:03:53 [ERROR] > testNotifyCheckpointAborted[unalignedCheckpointEnabled > =true](org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase) > Time elapsed: 4.995 s <<< FAILURE! > Aug 05 03:03:53 java.lang.AssertionError: expected:<1> but was:<2> > Aug 05 03:03:53 at org.junit.Assert.fail(Assert.java:88) > Aug 05 03:03:53 at org.junit.Assert.failNotEquals(Assert.java:834) > Aug 05 03:03:53 at org.junit.Assert.assertEquals(Assert.java:645) > Aug 05 03:03:53 at org.junit.Assert.assertEquals(Assert.java:631) > Aug 05 03:03:53 at > org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAbortedTimes(NotifyCheckpointAbortedITCase.java:210) > Aug 05 03:03:53 at > org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:186) > Aug 05 03:03:53 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > Aug 05 03:03:53 at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > Aug 05 03:03:53 at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > Aug 05 03:03:53 at java.lang.reflect.Method.invoke(Method.java:498) > Aug 05 03:03:53 at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > Aug 05 03:03:53 at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > Aug 05 03:03:53 at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > Aug 05 03:03:53 at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > Aug 05 03:03:53 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > Aug 05 03:03:53 at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > Aug 05 03:03:53 at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > Aug 05 03:03:53 at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on a diff in pull request #23219: [FLINK-20681][yarn] Support specifying the hdfs path when ship archives or files
1996fanrui commented on code in PR #23219: URL: https://github.com/apache/flink/pull/23219#discussion_r1296881231 ## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ## @@ -272,17 +272,24 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( -"A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); +"A semicolon-separated list of files and/or directories to be shipped to the YARN " ++ "cluster. These files/directories can come from the local client and/or remote " Review Comment: ```suggestion + "cluster. These files/directories can come from the local path of flink client or remote " ``` ## flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java: ## @@ -272,17 +272,24 @@ public class YarnConfigOptions { .noDefaultValue() .withDeprecatedKeys("yarn.ship-directories") .withDescription( -"A semicolon-separated list of files and/or directories to be shipped to the YARN cluster."); +"A semicolon-separated list of files and/or directories to be shipped to the YARN " ++ "cluster. These files/directories can come from the local client and/or remote " ++ "file system. For example, \"/path/to/local/file;/path/to/local/directory;" ++ "hdfs://$namenode_address/path/of/file;" ++ "hdfs://$namenode_address/path/of/directory\""); public static final ConfigOption> SHIP_ARCHIVES = key("yarn.ship-archives") .stringType() .asList() .noDefaultValue() .withDescription( -"A semicolon-separated list of archives to be shipped to the YARN cluster." -+ " These archives will be un-packed when localizing and they can be any of the following types: " -+ "\".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\"."); +"A semicolon-separated list of archives to be shipped to the YARN cluster. " ++ "These archives can come from the local client and/or remote file system. " ++ "They will be un-packed when localizing and they can be any of the following " ++ "types: \".tar.gz\", \".tar\", \".tgz\", \".dst\", \".jar\", \".zip\". " ++ "For example, \"/path/to/local/archive.jar;" ++ "hdfs://$namenode_address/path/to/archive.jar\""); Review Comment: Same 2 comments as the last option. ## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ## @@ -911,31 +929,21 @@ private ApplicationReport startAppMaster( final List systemClassPaths = fileUploader.registerProvidedLocalResources(); final List uploadedDependencies = fileUploader.registerMultipleLocalResources( -systemShipFiles.stream() -.map(e -> new Path(e.toURI())) -.collect(Collectors.toSet()), -Path.CUR_DIR, -LocalResourceType.FILE); +systemShipFiles, Path.CUR_DIR, LocalResourceType.FILE); Review Comment: `systemShipFiles` includes 3 types of files: - ShipFiles - logConfigFilePath - LibFolders After this PR, the latter two types do not add schema, does it work as expected? ## flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java: ## @@ -202,16 +204,27 @@ public YarnClusterDescriptor( this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL); } -private Optional> decodeFilesToShipToCluster( +private Optional> decodeFilesToShipToCluster( final Configuration configuration, final ConfigOption> configOption) { checkNotNull(configuration); checkNotNull(configOption); -final List files = -ConfigUtils.decodeListFromConfig(configuration, configOption, File::new); +List files = ConfigUtils.decodeListFromConfig(configuration, configOption, Path::new); +files = +files.stream() +.map( +path -> +isLoca
[jira] [Comment Edited] (FLINK-16050) Add Attempt Information
[ https://issues.apache.org/jira/browse/FLINK-16050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283595#comment-17283595 ] Matthias Pohl edited comment on FLINK-16050 at 8/17/23 9:34 AM: [~vthinkxie] [~lining] What's the progress on this issue? It's mentioned in FLIP-100 -which is labeled as {{released}} in {{1.11}}-. In contrast, the Jira issue itself is still open/in progress? was (Author: mapohl): [~vthinkxie] [~lining] What's the progress on this issue? It's mentioned in FLIP-100 which is labeled as {{released}} in {{1.11}}. In contrast, the Jira issue itself is still open/in progress? > Add Attempt Information > --- > > Key: FLINK-16050 > URL: https://issues.apache.org/jira/browse/FLINK-16050 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Web Frontend >Reporter: lining >Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > According to the > [docs|https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-vertices-vertexid-subtasks-subtaskindex], > there may exist more than one attempt in a subtask, but there is no way to > get the attempt history list in the REST API, users have no way to know if > the subtask has failed before. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-22090) Upload logs fails
[ https://issues.apache.org/jira/browse/FLINK-22090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl closed FLINK-22090. - Resolution: Cannot Reproduce I'm closing this one. It hasn't reappared, apparently, and all the builds are cleaned up already. > Upload logs fails > - > > Key: FLINK-22090 > URL: https://issues.apache.org/jira/browse/FLINK-22090 > Project: Flink > Issue Type: Bug > Components: Test Infrastructure >Reporter: Matthias Pohl >Priority: Minor > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-deprioritized-minor, stale-minor, test-stability > > [This > build|https://dev.azure.com/mapohl/flink/_build/results?buildId=382&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=599dab09-ab33-58b6-4804-349ab7dc2f73] > failed just because an {{upload logs}} step failed. It looks like this is an > AzureCI problem. Is this a known issue? > The artifacts seems to be uploaded based on the logs. But [the download > link|https://dev.azure.com/mapohl/flink/_build/results?buildId=382&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267] > does not show up. > Another build that had the same issue: > [test_ci_blinkplanner|https://dev.azure.com/mapohl/flink/_build/results?buildId=383&view=logs&j=d1352042-8a7d-50b6-3946-a85d176b7981&t=7b7009bb-e6bf-5426-3d4b-20b25eada636&l=75] > and > [test_ci_build_core|https://dev.azure.com/mapohl/flink/_build/results?buildId=383&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=599dab09-ab33-58b6-4804-349ab7dc2f73&l=44] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
zentol commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1296947756 ## tools/ci/verify_bundled_optional.sh: ## @@ -17,24 +17,51 @@ # limitations under the License. # +usage() { + echo "Usage: $0 " + echo " A file containing the output of the Maven build." + echo "" + echo "mvnw clean package > " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ShadeOptionalChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +if [[ "$#" != "1" ]]; then + usage + exit 1 +fi + ## Checks that all bundled dependencies are marked as optional in the poms MVN_CLEAN_COMPILE_OUT=$1 -CI_DIR=$2 -FLINK_ROOT=$3 -source "${CI_DIR}/maven-utils.sh" +MVN=${MVN:-./mvnw} -cd "$FLINK_ROOT" || exit +dependency_plugin_output=/tmp/optional_dep.txt -dependency_plugin_output=${CI_DIR}/optional_dep.txt +$MVN dependency:tree -B > "${dependency_plugin_output}" Review Comment: _every_ call (apart from `mvn exec` has to run with a parallelism of 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
1996fanrui commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1681973924 Thanks @gaborgsomogyi for the quick feedback. Sorry, I thought it's enough easy and clear, so I didn't explain detailed background and mark it as the `hotfix`. Let me explain it here: 1. The `public put()` method call the `private put()`, and these 2 `put` both call `createBasePathIfNeeded`, it's unnecessary. 2. The `private put` is just called by `putlic put()`, so I removed the `private put()` in this PR. I checked why added 2 `put` methods, because the old interface has 2 `public put` methods. Now it just has one, so I think the `private put()` can be removed as well. https://github.com/apache/flink/assets/38427477/fa69f4f7-68aa-4325-a69f-2961e87d10f8";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Zakelly commented on a diff in pull request #22890: [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with TaskManagerServices
Zakelly commented on code in PR #22890: URL: https://github.com/apache/flink/pull/22890#discussion_r1296870428 ## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.concurrent.GuardedBy; + +import java.util.HashMap; +import java.util.Map; + +/** + * There is one {@link FileMergingSnapshotManager} for each job per task manager. This class holds + * all {@link FileMergingSnapshotManager} objects for a task executor (manager). + */ +public class TaskExecutorFileMergingManager { +/** Logger for this class. */ +private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorFileMergingManager.class); + +/** + * This map holds all FileMergingSnapshotManager for tasks running on this task + * manager(executor). + */ +@GuardedBy("lock") +private final Map fileMergingSnapshotManagerByJobId; + +@GuardedBy("lock") +private boolean closed; + +private final Object lock = new Object(); + +/** Shutdown hook for this manager. */ +private final Thread shutdownHook; + +public TaskExecutorFileMergingManager() { +this.fileMergingSnapshotManagerByJobId = new HashMap<>(); +this.closed = false; +this.shutdownHook = +ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); +} + +/** + * Initialize file merging snapshot manager for each job according configurations when {@link + * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. + */ +public FileMergingSnapshotManager fileMergingSnapshotManagerForJob( +@Nonnull JobID jobId, +Configuration clusterConfiguration, +Configuration jobConfiguration) { +synchronized (lock) { +if (closed) { +throw new IllegalStateException( +"TaskExecutorFileMergingManager is already closed and cannot " ++ "register a new FileMergingSnapshotManager."); +} +FileMergingSnapshotManager fileMergingSnapshotManager = +fileMergingSnapshotManagerByJobId.get(jobId); +if (fileMergingSnapshotManager == null) { +// TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration Review Comment: call ```FileMergingSnapshotManagerBuilder.build``` here? ## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManage
[GitHub] [flink] gaborgsomogyi commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
gaborgsomogyi commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1682013702 So it's a cosmetic change, ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code
[ https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755467#comment-17755467 ] yong yang commented on FLINK-29546: --- i hava same error in flink1.15.0 CREATE TABLE t2 ( `log` STRING, `uid` AS get_json_object(`log`,'$.data.uid'), `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'), `isProduct` AS get_json_object(`log`, '$.data.isProduct'), `ct` AS get_json_object(`log`,'$.data.dialogDO.ctime',0), `ct1` AS TO_TIMESTAMP_LTZ( get_json_object(`log`, '$.data.dialogDO.ctime',0) ,0 ), proc_time AS PROCTIME() , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'earliest-offset', 'topic' = 'councilFeedbackRealtime', 'properties.group.id' = 'aaa', 'format' = 'raw' ); package com.tuya.tlink.udf.common import com.alibaba.fastjson2.\{JSONPath, JSONReader} import org.apache.flink.table.functions.ScalarFunction import scala.util.Try import org.apache.flink.configuration.\{Configuration, RestOptions} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.row import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object GetJsonObject { def main(args: Array[String]): Unit = { val conf = new Configuration conf.setInteger(RestOptions.PORT, 38080) val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) val tEnv = StreamTableEnvironment.create(env) val table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)) , DataTypes.FIELD("name", DataTypes.STRING()) ), row(2L, """\{"k1":"v111","k2":100}""") ) tEnv.createTemporaryView("tb1", table) tEnv.createFunction("get_json_object", classOf[GetJsonObject]) val tb = tEnv.executeSql( """ | select id,get_json_object(name,'$.k3',999) as v from tb1 |""".stripMargin) println(tb.getTableSchema) tb.print() } } /** * 适合获取一个json值 * 支持 多余转义字符 \ 的json * 支持完全jsonpath eg: $.f1.f2 */ class GetJsonObject extends ScalarFunction { /** * @param json 兼容 空字符串 * @param path * @return */ def eval(json: String, path: String): String = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse("") } } jsobj } def eval(json: String, path: String,defaultValue:String): String = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue) } } jsobj } def eval(json: String, path: String, defaultValue: Long): Long = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue.toString) } } jsobj.toLong } } > UDF:Failed to compile split code, falling back to original code > --- > > Key: FLINK-29546 > URL: https://issues.apache.org/jira/browse/FLINK-29546 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0 > Environment: my pom: > > org.apache.flink > flink-table-api-java-uber > > > org.apache.flink > flink-table-runtime > > > org.apache.flink > flink-table-planner-loader > > jdk 1.8 >Reporter: Hui Wang >Priority: Major > Attachments: image-2023-08-17-18-18-50-937.png > > > 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN > org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile > split code, falling back to original code > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > at > org.apache.flink.table.runtime.operators.wi
[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code
[ https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755468#comment-17755468 ] yong yang commented on FLINK-29546: --- !image-2023-08-17-18-18-50-937.png! > UDF:Failed to compile split code, falling back to original code > --- > > Key: FLINK-29546 > URL: https://issues.apache.org/jira/browse/FLINK-29546 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0 > Environment: my pom: > > org.apache.flink > flink-table-api-java-uber > > > org.apache.flink > flink-table-runtime > > > org.apache.flink > flink-table-planner-loader > > jdk 1.8 >Reporter: Hui Wang >Priority: Major > Attachments: image-2023-08-17-18-18-50-937.png > > > 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN > org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile > split code, falling back to original code > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > at > org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148) > at > org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274) > at > org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 15 common frames omitted > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) > ... 18 common frames omitted > Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column > 28: Cannot determine simple type name "org" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) > at > org.codehaus.janino.UnitCompiler.getRefere
[jira] [Updated] (FLINK-29546) UDF:Failed to compile split code, falling back to original code
[ https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yong yang updated FLINK-29546: -- Attachment: image-2023-08-17-18-18-50-937.png > UDF:Failed to compile split code, falling back to original code > --- > > Key: FLINK-29546 > URL: https://issues.apache.org/jira/browse/FLINK-29546 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0 > Environment: my pom: > > org.apache.flink > flink-table-api-java-uber > > > org.apache.flink > flink-table-runtime > > > org.apache.flink > flink-table-planner-loader > > jdk 1.8 >Reporter: Hui Wang >Priority: Major > Attachments: image-2023-08-17-18-18-50-937.png > > > 2022-10-08 19:05:23 [GroupWindowAggregate[11] (1/1)#0] WARN > org.apache.flink.table.runtime.generated.GeneratedClass -Failed to compile > split code, falling back to original code > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > at > org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.compileGeneratedCode(AggregateWindowOperator.java:148) > at > org.apache.flink.table.runtime.operators.window.WindowOperator.open(WindowOperator.java:274) > at > org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.open(AggregateWindowOperator.java:139) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 15 common frames omitted > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) > ... 18 common frames omitted > Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column > 28: Cannot determine simple type name "org" > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833) > at > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594) > at >
[jira] [Commented] (FLINK-29546) UDF:Failed to compile split code, falling back to original code
[ https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755469#comment-17755469 ] yong yang commented on FLINK-29546: --- error log: {code:java} //代码占位符 2023-08-17 15:46:02java.lang.RuntimeException: Could not instantiate generated class 'WatermarkGenerator$0' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) at org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) at org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) at org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) at org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) ... 16 moreCaused by: org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) ... 18 moreCaused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) ... 21 moreCaused by: org.codehaus.commons.compiler.CompileException: Line 30, Column 72: Cannot determine simple type name "org" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6833)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6594)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6607)at org.codehaus.janino.UnitCompiler.getRefe
[jira] [Comment Edited] (FLINK-29546) UDF:Failed to compile split code, falling back to original code
[ https://issues.apache.org/jira/browse/FLINK-29546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755467#comment-17755467 ] yong yang edited comment on FLINK-29546 at 8/17/23 10:20 AM: - i hava same error in flink1.15.0 {code:java} //代码占位符 CREATE TABLE t2 ( `log` STRING, `uid` AS get_json_object(`log`,'$.data.uid'), `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'), `isProduct` AS get_json_object(`log`, '$.data.isProduct'), `ct` AS get_json_object(`log`,'$.data.dialogDO.ctime',0), `ct1` AS TO_TIMESTAMP_LTZ( get_json_object(`log`, '$.data.dialogDO.ctime',0) ,0 ), proc_time AS PROCTIME() , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'earliest-offset', 'topic' = 'councilFeedbackRealtime', 'properties.group.id' = 'aaa', 'format' = 'raw' ); {code} {code:java} //代码占位符 package com.aa.tlink.udf.common import com.alibaba.fastjson2.{JSONPath, JSONReader} import org.apache.flink.table.functions.ScalarFunction import scala.util.Try import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.row import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object GetJsonObject { def main(args: Array[String]): Unit = { val conf = new Configuration conf.setInteger(RestOptions.PORT, 38080) val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) val tEnv = StreamTableEnvironment.create(env) val table = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.DECIMAL(10, 2)) , DataTypes.FIELD("name", DataTypes.STRING()) ), row(2L, """\{"k1":"v111","k2":100} """) ) tEnv.createTemporaryView("tb1", table) tEnv.createFunction("get_json_object", classOf[GetJsonObject]) val tb = tEnv.executeSql( """ select id,get_json_object(name,'$.k3',999) as v from tb1 """.stripMargin) println(tb.getTableSchema) tb.print() } } /** 适合获取一个json值 支持 多余转义字符 \ 的json 支持完全jsonpath eg: $.f1.f2 */ class GetJsonObject extends ScalarFunction { /** @param json 兼容 空字符串 @param path @return */ def eval(json: String, path: String): String = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse("") } } jsobj } def eval(json: String, path: String,defaultValue:String): String = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue) } } jsobj } def eval(json: String, path: String, defaultValue: Long): Long = { // paths eg: $.f1.f2 // path分步解析,实现支持 反斜杠的json 同时兼容 $.f1.f2 或者 f1 val path_arr = path.replace("$.", "") // f1.f2 或者f1 .split('.') // [f1,f2] 或者 [f1] .map(s => s"$$.${s}") var jsobj: String = json path_arr.foreach { path_part => { jsobj = Try(JSONPath.of(path_part).extract(JSONReader.of(jsobj)).toString).getOrElse(defaultValue.toString) } } jsobj.toLong } } {code} was (Author: JIRAUSER301658): i hava same error in flink1.15.0 CREATE TABLE t2 ( `log` STRING, `uid` AS get_json_object(`log`,'$.data.uid'), `phoneCode` AS get_json_object(`log`,'$.data.phoneCode'), `isProduct` AS get_json_object(`log`, '$.data.isProduct'), `ct` AS get_json_object(`log`,'$.data.dialogDO.ctime',0), `ct1` AS TO_TIMESTAMP_LTZ( get_json_object(`log`, '$.data.dialogDO.ctime',0) ,0 ), proc_time AS PROCTIME() , WATERMARK FOR `ct1` AS `ct1` - INTERVAL '15' SECOND ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'localhost:9092', 'scan.startup.mode' = 'earliest-offset', 'topic' = 'councilFeedbackRealtime', 'properties.group.id' = 'aaa', 'format' = 'raw' ); package com.tuya.tlink.udf.common import com.alibaba.fastjson2.\{JSONPath, JSONReader} import org.apache.flink.table.functions.ScalarFunction import scala.util.Try import org.apache.flink.configuration.\{Configuration, RestOptions} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.Expressions.row import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment object GetJsonObject { def main(args: Array[String]): Unit = { val
[jira] [Updated] (FLINK-28229) Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
[ https://issues.apache.org/jira/browse/FLINK-28229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-28229: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Introduce Source API alternatives for > StreamExecutionEnvironment#fromCollection() methods > - > > Key: FLINK-28229 > URL: https://issues.apache.org/jira/browse/FLINK-28229 > Project: Flink > Issue Type: Sub-task >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available, stale-assigned > > * FromElementsFunction > * FromIteratorFunction > are based on SourceFunction API -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS
[ https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-28171: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Adjust Job and Task manager port definitions to work with Istio+mTLS > > > Key: FLINK-28171 > URL: https://issues.apache.org/jira/browse/FLINK-28171 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.14.4 > Environment: flink-kubernetes-operator 1.0.0 > Flink 1.14-java11 > Kubernetes v1.19.5 > Istio 1.7.6 >Reporter: Moshe Elisha >Assignee: Moshe Elisha >Priority: Major > Labels: pull-request-available, stale-assigned > > Hello, > > We are launching Flink deployments using the [Flink Kubernetes > Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/] > on a Kubernetes cluster with Istio and mTLS enabled. > > We found that the TaskManager is unable to communicate with the JobManager on > the jobmanager-rpc port: > > {{2022-06-15 15:25:40,508 WARN akka.remote.ReliableDeliverySupervisor > [] - Association with remote system > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123] > has failed, address is now gated for [50] ms. Reason: [Association failed > with > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]] > Caused by: [The remote system explicitly disassociated (reason unknown).]}} > > The reason for the issue is that the JobManager service port definitions are > not following the Istio guidelines > [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/] > (see example below). > > There was also an email discussion around this topic in the users mailing > group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - > port definitions". > With the help of the community, we were able to work around the issue but it > was very hard and forced us to skip Istio proxy which is not ideal. > > We would like you to consider changing the default port definitions, either > # Rename the ports – I understand it is Istio specific guideline but maybe > it is better to at least be aligned with one (popular) vendor guideline > instead of none at all. > # Add the “appProtocol” property[1] that is not specific to any vendor but > requires Kubernetes >= 1.19 where it was introduced as beta and moved to > stable in >= 1.20. The option to add appProtocol property was added only in > [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with > [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570]. > # Or allow a way to override the defaults. > > [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol] > > > {{# k get service inference-results-to-analytics-engine -o yaml}} > {{apiVersion: v1}} > {{kind: Service}} > {{...}} > {{spec:}} > {{ clusterIP: None}} > {{ ports:}} > {{ - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" > property*}} > {{ port: 6123}} > {{ protocol: TCP}} > {{ targetPort: 6123}} > {{ - name: blobserver *# should start with "tcp-" or add "appProtocol" > property*}} > {{ port: 6124}} > {{ protocol: TCP}} > {{ targetPort: 6124}} > {{...}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27815) Improve the join reorder strategy for batch sql job
[ https://issues.apache.org/jira/browse/FLINK-27815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27815: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Improve the join reorder strategy for batch sql job > > > Key: FLINK-27815 > URL: https://issues.apache.org/jira/browse/FLINK-27815 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available, stale-assigned > > Join is heavy operation in the execution, the join order in a query can have > a significant impact on the query’s performance. > Currently, the planner has one join reorder strategy which is provided by > Apache Calcite, and it strongly depends on the statistics. > It's better we can provide different join reorder strategies for different > situations, such as: > 1. provide a join reorder strategy without statistics, e.g. eliminate cross > joins > 2. improve current join reorders strategy with statistics > 3. provide hints to allow users to choose join order strategy > 4. ... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27939) [JUnit5 Migration] Module: flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-27939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27939: --- Labels: stale-assigned starter (was: starter) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > [JUnit5 Migration] Module: flink-connector-hive > --- > > Key: FLINK-27939 > URL: https://issues.apache.org/jira/browse/FLINK-27939 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Alexander Preuss >Assignee: hk__lrzy >Priority: Minor > Labels: stale-assigned, starter > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27954) JobVertexFlameGraphHandler does not work on standby Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-27954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27954: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > JobVertexFlameGraphHandler does not work on standby Dispatcher > -- > > Key: FLINK-27954 > URL: https://issues.apache.org/jira/browse/FLINK-27954 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination, Runtime / REST >Affects Versions: 1.15.0 >Reporter: Chesnay Schepler >Assignee: Weijie Guo >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > The {{JobVertexFlameGraphHandler}} relies internally on the > {{JobVertexThreadInfoTracker}} which calls > {{ResourceManagerGateway#requestTaskExecutorThreadInfoGateway}} to get a > gateway for requesting the thread info from the task executors. Since this > gateway is not serializable it would categorically fail if called from a > standby dispatcher. > Instead this should follow the logic of the {{MetricFetcherImpl}}, which > requests addresses instead and manually connects to the task executors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27882) [JUnit5 Migration] Module: flink-scala
[ https://issues.apache.org/jira/browse/FLINK-27882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27882: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > [JUnit5 Migration] Module: flink-scala > -- > > Key: FLINK-27882 > URL: https://issues.apache.org/jira/browse/FLINK-27882 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available, stale-assigned > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-7697) Add metrics for Elasticsearch Sink
[ https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-7697: -- Labels: auto-deprioritized-major auto-unassigned pull-request-available stale-minor (was: auto-deprioritized-major auto-unassigned pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Add metrics for Elasticsearch Sink > -- > > Key: FLINK-7697 > URL: https://issues.apache.org/jira/browse/FLINK-7697 > Project: Flink > Issue Type: Improvement > Components: Connectors / ElasticSearch >Reporter: Hai Zhou >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available, stale-minor > Time Spent: 10m > Remaining Estimate: 0h > > We should add metrics to track events write to ElasticasearchSink. > eg. > * number of successful bulk sends > * number of documents inserted > * number of documents updated > * number of documents version conflicts -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28433) Allow connection to mysql through mariadb driver
[ https://issues.apache.org/jira/browse/FLINK-28433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-28433: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Allow connection to mysql through mariadb driver > > > Key: FLINK-28433 > URL: https://issues.apache.org/jira/browse/FLINK-28433 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Reporter: PengLei >Assignee: bo zhao >Priority: Minor > Labels: pull-request-available, stale-assigned > Attachments: image-2022-07-07-09-29-06-834.png > > > Flink connector support connection to mysql. But the url must be started with > "jdbc:mysql". > Some user need to use mariadb dirver to connect to mysql. It can be achieved > by setting the driver parameter in jdbcOptions. Unfortunately, the url > verification fails. > > as follows: > !image-2022-07-07-09-29-06-834.png! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-6935) Integration of SQL and CEP
[ https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-6935: -- Labels: auto-deprioritized-major auto-unassigned stale-minor (was: auto-deprioritized-major auto-unassigned) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Integration of SQL and CEP > -- > > Key: FLINK-6935 > URL: https://issues.apache.org/jira/browse/FLINK-6935 > Project: Flink > Issue Type: New Feature > Components: Library / CEP, Table SQL / API >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, stale-minor > > Flink's CEP library is a great library for complex event processing, more and > more customers are expressing their interests in it. But it also has some > limitations that users usually have to write a lot of code even for a very > simple pattern match use case as it currently only supports the Java API. > CEP DSLs and SQLs strongly resemble each other. CEP's additional features > compared to SQL boil down to pattern detection. So It will be awesome to > consolidate CEP and SQL. It makes SQL more powerful to support more usage > scenario. And it gives users the ability to easily and quickly to build CEP > applications. > The FLIP can be found here: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP > This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to > start this work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27970) [JUnit5 Migration] Module: flink-hadoop-buik
[ https://issues.apache.org/jira/browse/FLINK-27970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27970: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > [JUnit5 Migration] Module: flink-hadoop-buik > > > Key: FLINK-27970 > URL: https://issues.apache.org/jira/browse/FLINK-27970 > Project: Flink > Issue Type: Sub-task >Reporter: Ryan Skraba >Assignee: Ryan Skraba >Priority: Major > Labels: pull-request-available, stale-assigned > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27805) Bump ORC version to 1.7.8
[ https://issues.apache.org/jira/browse/FLINK-27805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27805: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Bump ORC version to 1.7.8 > - > > Key: FLINK-27805 > URL: https://issues.apache.org/jira/browse/FLINK-27805 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: jia liu >Assignee: Panagiotis Garefalakis >Priority: Minor > Labels: pull-request-available, stale-assigned > > The current ORC dependency version of flink is 1.5.6, but the latest ORC > version 1.7.x has been released for a long time. > In order to use these new features (zstd compression, column encryption > etc.), we should upgrade the orc version. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-13698) Rework threading model of CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-13698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-13698: --- Labels: auto-deprioritized-critical auto-deprioritized-major stale-minor (was: auto-deprioritized-critical auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Rework threading model of CheckpointCoordinator > --- > > Key: FLINK-13698 > URL: https://issues.apache.org/jira/browse/FLINK-13698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Priority: Minor > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > Currently {{CheckpointCoordinator}} and {{CheckpointFailureManager}} code is > executed by multiple different threads (mostly {{ioExecutor}}, but not only). > It's causing multiple concurrency issues, for example: > https://issues.apache.org/jira/browse/FLINK-13497 > Proper fix would be to rethink threading model there. At first glance it > doesn't seem that this code should be multi threaded, except of parts doing > the actual IO operations, so it should be possible to run everything in one > single ExecutionGraph's thread and just run asynchronously necessary IO > operations with some feedback loop ("mailbox style"). > I would strongly recommend fixing this issue before adding new features in > the \{{CheckpointCoordinator}} component. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-11141) Key generation for RocksDBMapState can theoretically be ambiguous
[ https://issues.apache.org/jira/browse/FLINK-11141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-11141: --- Labels: auto-deprioritized-critical auto-deprioritized-major auto-deprioritized-minor stale-minor (was: auto-deprioritized-critical auto-deprioritized-major auto-deprioritized-minor) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Key generation for RocksDBMapState can theoretically be ambiguous > - > > Key: FLINK-11141 > URL: https://issues.apache.org/jira/browse/FLINK-11141 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Priority: Minor > Labels: auto-deprioritized-critical, auto-deprioritized-major, > auto-deprioritized-minor, stale-minor > > RocksDBMap state stores values in RocksDB under a composite key from the > serialized bytes of {{key-group-id|key|namespace|user-key}}. In this > composition, key, namespace, and user-key can either have fixed sized or > variable sized serialization formats. In cases of at least 2 variable > formats, ambiguity can be possible, e.g.: > abcd <-> efg > abc <-> defg > Our code takes care of this for all other states, where composite keys only > consist of key and namespace by checking for 2x variable size and appending > the serialized length to each byte sequence. > However, for map state there is no inclusion of the user-key in the check for > potential ambiguity, as well as for appending the size. This means that, in > theory, some combinations can produce colliding composite keys in RocksDB. > What is required is to include the user-key serializer in the check and > append the length there as well. > Please notice that this cannot be simply changed because it has implications > for backwards compatibility and requires some form of migration for the state > keys on restore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28022) Support Google Cloud PubSub connector in Python DataStream API
[ https://issues.apache.org/jira/browse/FLINK-28022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-28022: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > Support Google Cloud PubSub connector in Python DataStream API > -- > > Key: FLINK-28022 > URL: https://issues.apache.org/jira/browse/FLINK-28022 > Project: Flink > Issue Type: Improvement > Components: API / Python, Connectors / Google Cloud PubSub >Reporter: pengmd >Assignee: pengmd >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.18.0 > > > Support Google Cloud PubSub connector in Python DataStream API -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27885) [JUnit5 Migration] Module: flink-csv
[ https://issues.apache.org/jira/browse/FLINK-27885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-27885: --- Labels: pull-request-available stale-assigned (was: pull-request-available) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issue is assigned but has not received an update in 30 days, so it has been labeled "stale-assigned". If you are still working on the issue, please remove the label and add a comment updating the community on your progress. If this issue is waiting on feedback, please consider this a reminder to the committer/reviewer. Flink is a very active project, and so we appreciate your patience. If you are no longer working on the issue, please unassign yourself so someone else may work on it. > [JUnit5 Migration] Module: flink-csv > > > Key: FLINK-27885 > URL: https://issues.apache.org/jira/browse/FLINK-27885 > Project: Flink > Issue Type: Sub-task > Components: Tests >Reporter: Ryan Skraba >Assignee: Ryan Skraba >Priority: Major > Labels: pull-request-available, stale-assigned > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] xiangyuf opened a new pull request, #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…
xiangyuf opened a new pull request, #23230: URL: https://github.com/apache/flink/pull/23230 ## What is the purpose of the change Fix the issue that redundant taskManagers will not be fulfilled in FineGrainedSlotManager periodically. ## Brief change log - Change ResourceAllocationStrategy#tryReleaseUnusedResources to ResourceAllocationStrategy#tryReconcileClusterResources - Change ResourceReleaseResult to ResourceReconcileResult and add field pendingTaskManagersToAdd to ResourceReconcileResult - make the DefaultResourceAllocationStrategy#tryReconcileClusterResources not only release but also fulfill the redundant taskmanagers - extract DefaultResourceAllocationStrategy::tryFulFillRedundantResourcesWithAction as a reused function for both tryFulfillRequirements and tryReconcileClusterResources ## Verifying this change This change added tests and can be verified as follows: - Added unit test DefaultResourceAllocationStrategyTest#testRedundantResourceShouldBeFulfilled ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32880) Redundant taskManager should be replenished in FineGrainedSlotManager
[ https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32880: --- Labels: pull-request-available (was: ) > Redundant taskManager should be replenished in FineGrainedSlotManager > - > > Key: FLINK-32880 > URL: https://issues.apache.org/jira/browse/FLINK-32880 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > > Currently, if you are using FineGrainedSlotManager, when a redundant > taskmanager exit abnormally, no extra taskmanager will be replenished during > the > periodical check in FineGrainedSlotManager. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
[ https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiangyu feng updated FLINK-32880: - Summary: Redundant taskManagers should always be fulfilled in FineGrainedSlotManager (was: Redundant taskManager should be replenished in FineGrainedSlotManager) > Redundant taskManagers should always be fulfilled in FineGrainedSlotManager > --- > > Key: FLINK-32880 > URL: https://issues.apache.org/jira/browse/FLINK-32880 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > > Currently, if you are using FineGrainedSlotManager, when a redundant > taskmanager exit abnormally, no extra taskmanager will be replenished during > the > periodical check in FineGrainedSlotManager. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] flinkbot commented on pull request #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…
flinkbot commented on PR #23230: URL: https://github.com/apache/flink/pull/23230#issuecomment-1682079861 ## CI report: * 1344e4452bb0506b44e3413639350ec33472da76 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #21289: [FLINK-29452] Allow unit tests to be executed independently
XComp commented on code in PR #21289: URL: https://github.com/apache/flink/pull/21289#discussion_r1297093524 ## flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java: ## @@ -21,56 +21,69 @@ import org.apache.flink.testutils.junit.extensions.retry.RetryExtension; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.HashMap; + +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link RetryOnFailure} annotation on JUnit5 {@link RetryExtension}. */ @ExtendWith(RetryExtension.class) class RetryOnFailureExtensionTest { -private static final int NUMBER_OF_RUNS = 5; - -private static int numberOfFailedRuns; +private static final int NUMBER_OF_RETRIES = 5; -private static int numberOfSuccessfulRuns; +private static final HashMap methodRunCount = new HashMap<>(); -private static boolean firstRun = true; +@BeforeEach +void incrementMethodRunCount(TestInfo testInfo) { +// Set or increment the run count for the unit test method, by the method short name. +// This starts at 1 and is incremented before the test starts. +testInfo.getTestMethod() +.ifPresent( +method -> +methodRunCount.compute( +method.getName(), (k, v) -> (v == null) ? 1 : v + 1)); +} @AfterAll static void verify() { -assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns); -assertEquals(3, numberOfSuccessfulRuns); +assertThat(methodRunCount.get("testRetryOnFailure")) Review Comment: I think we can apply the same approach that I described above here. ## flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnExceptionExtensionTest.java: ## @@ -37,53 +40,66 @@ @ExtendWith(RetryExtension.class) class RetryOnExceptionExtensionTest { -private static final int NUMBER_OF_RUNS = 3; +private static final int NUMBER_OF_RETRIES = 3; -private static int runsForSuccessfulTest = 0; +private static final HashMap methodRunCount = new HashMap<>(); -private static int runsForTestWithMatchingException = 0; - -private static int runsForTestWithSubclassException = 0; - -private static int runsForPassAfterOneFailure = 0; +@BeforeEach +void incrementMethodRunCount(TestInfo testInfo) { +// Set or increment the run count for the unit test method, by the method short name. +// This starts at 1 and is incremented before the test starts. +testInfo.getTestMethod() +.ifPresent( +method -> +methodRunCount.compute( +method.getName(), (k, v) -> (v == null) ? 1 : v + 1)); +} @AfterAll static void verify() { -assertThat(runsForTestWithMatchingException).isEqualTo(NUMBER_OF_RUNS + 1); -assertThat(runsForTestWithSubclassException).isEqualTo(NUMBER_OF_RUNS + 1); -assertThat(runsForSuccessfulTest).isOne(); -assertThat(runsForPassAfterOneFailure).isEqualTo(2); +assertThat(methodRunCount.get("testSuccessfulTest")) Review Comment: I'm wondering whether we could also implement this test without having plain method names that we would have to add everywhere. We could do that by utilizing the `TestInfo` parameter in every test method and register the callbacks for the corresponding test method. That has the advantage that we have the test code in one location (the verification callback is defined within the test method instead of the `@AfterAll` annotated method). WDYT? We would have something like a `verificationCallbackRegistry` next to the `methodRunCount` and utilize methods like the following ones: ``` private static int assertAndReturnRunCount(TestInfo testInfo) { return methodRunCount.get(assertAndReturnTestMethodName(testInfo)); } private static void registerCallbackForTest(TestInfo testInfo, Consumer verification) { verificationCallbackRegistry.putIfAbsent( assertAndReturnTestMethodName(testInfo), () -> verification.accept(assertAndReturnRunCount(testInfo))); } private static String assertAndReturnTestMethodName(TestInfo testInfo) { return testInfo.getTestMethod() .orElseThrow(() -> new AssertionError("No test method is provided.")) .getName(); } ``` The testInfo comes from the test method's parameters. All test handlin
[GitHub] [flink] XComp commented on a diff in pull request #21289: [FLINK-29452] Allow unit tests to be executed independently
XComp commented on code in PR #21289: URL: https://github.com/apache/flink/pull/21289#discussion_r1297093524 ## flink-test-utils-parent/flink-test-utils-junit/src/test/java/org/apache/flink/testutils/junit/RetryOnFailureExtensionTest.java: ## @@ -21,56 +21,69 @@ import org.apache.flink.testutils.junit.extensions.retry.RetryExtension; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; -import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.HashMap; + +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link RetryOnFailure} annotation on JUnit5 {@link RetryExtension}. */ @ExtendWith(RetryExtension.class) class RetryOnFailureExtensionTest { -private static final int NUMBER_OF_RUNS = 5; - -private static int numberOfFailedRuns; +private static final int NUMBER_OF_RETRIES = 5; -private static int numberOfSuccessfulRuns; +private static final HashMap methodRunCount = new HashMap<>(); -private static boolean firstRun = true; +@BeforeEach +void incrementMethodRunCount(TestInfo testInfo) { +// Set or increment the run count for the unit test method, by the method short name. +// This starts at 1 and is incremented before the test starts. +testInfo.getTestMethod() +.ifPresent( +method -> +methodRunCount.compute( +method.getName(), (k, v) -> (v == null) ? 1 : v + 1)); +} @AfterAll static void verify() { -assertEquals(NUMBER_OF_RUNS + 1, numberOfFailedRuns); -assertEquals(3, numberOfSuccessfulRuns); +assertThat(methodRunCount.get("testRetryOnFailure")) Review Comment: I think we can apply the same approach [that I described above](https://github.com/apache/flink/pull/21289#discussion_r1297092550) here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
XComp commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1297097220 ## tools/ci/license_check.sh: ## @@ -17,6 +17,34 @@ # limitations under the License. +usage() { + if [[ "$#" != "2" ]]; then +echo "Usage: $0 " +echo " A file containing the output of the Maven build." +echo " A directory containing a Maven repository into which the Flink artifacts were deployed." +echo "" +echo "Example preparation:" +echo "mvnw clean deploy -DaltDeploymentRepository=validation_repository::default::file: > " +echo "" +echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." +echo "See further details in the JavaDoc of LicenseChecker." + fi +} + +while getopts 'h' o; do + case "${o}" in +h) + usage Review Comment: :disappointed: true, I mixed that one up :facepalm: whatever :shrug: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
XComp commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1297104015 ## tools/ci/compile.sh: ## @@ -69,34 +80,38 @@ fi echo " Checking Javadocs " +javadoc_output=/tmp/javadoc.out + # use the same invocation as .github/workflows/docs.sh -run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ +$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ Review Comment: ```suggestion $MVN javadoc:aggregate -T1 -DadditionalJOption='-Xdoclint:none' \ ``` according to your statement "every call (apart from mvn exec has to run with a parallelism of 1.". But tbh, I don't see value here because we're not parsing the output. In this sense, it would be up for the user to decide. :shrug: ## tools/ci/compile.sh: ## @@ -69,34 +80,38 @@ fi echo " Checking Javadocs " +javadoc_output=/tmp/javadoc.out + # use the same invocation as .github/workflows/docs.sh -run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ +$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -Drat.skip=true \ - -Dheader=someTestHeader > javadoc.out + -Dheader=someTestHeader > ${javadoc_output} EXIT_CODE=$? if [ $EXIT_CODE != 0 ] ; then echo "ERROR in Javadocs. Printing full output:" - cat javadoc.out ; rm javadoc.out + cat ${javadoc_output} exit $EXIT_CODE fi echo " Checking Scaladocs " -run_mvn scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -pl flink-scala 2> scaladoc.out +scaladoc_output=/tmp/scaladoc.out + +$MVN scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output} Review Comment: ```suggestion $MVN scala:doc -T1 -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output} ``` according to your statement "every call (apart from mvn exec has to run with a parallelism of 1.". But here as well: We're not parsing the output. So it would be up to the user how to deal with it. ## tools/ci/verify_bundled_optional.sh: ## @@ -17,31 +17,58 @@ # limitations under the License. # +usage() { + echo "Usage: $0 " + echo " A file containing the output of the Maven build." + echo "" + echo "mvnw clean package > " + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ShadeOptionalChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +if [[ "$#" != "1" ]]; then + usage + exit 1 +fi + ## Checks that all bundled dependencies are marked as optional in the poms MVN_CLEAN_COMPILE_OUT=$1 -CI_DIR=$2 -FLINK_ROOT=$3 -source "${CI_DIR}/maven-utils.sh" +MVN=${MVN:-./mvnw} -cd "$FLINK_ROOT" || exit +dependency_plugin_output=/tmp/dependency_tree_optional.txt -dependency_plugin_output=${CI_DIR}/optional_dep.txt +$MVN dependency:tree -B -T1 > "${dependency_plugin_output}" Review Comment: ```suggestion # dependency:tree needs to run without multi-threading support to ensure that the output stays parseable (the lines appear in a sequential order) $MVN dependency:tree -B -T1 > "${dependency_plugin_output}" ``` Should we add a comment to ensure that there is no desire to "optimize" the maven call in the future? ## tools/ci/verify_scala_suffixes.sh: ## @@ -37,41 +37,52 @@ # # The script uses 'mvn dependency:tree -Dincludes=org.scala-lang' to list Scala # dependent modules. -CI_DIR=$1 -FLINK_ROOT=$2 + + +usage() { + echo "Usage: $0" + echo "" + echo "The environment variable MVN is used to specify the Maven binaries; defaults to 'mvnw'." + echo "See further details in the JavaDoc of ScalaSuffixChecker." +} + +while getopts 'h' o; do + case "${o}" in +h) + usage + exit 0 + ;; + esac +done + +MVN=${MVN:-./mvnw} echo "--- Flink Scala Dependency Analyzer ---" echo "Analyzing modules for Scala dependencies using 'mvn dependency:tree'." echo "If you haven't built the project, please do so first by running \"mvn clean install -DskipTests\"" -source "${CI_DIR}/maven-utils.sh" - -cd "$FLINK_ROOT" || exit +dependency_plugin_output=/tmp/dependency_tree_scala.txt -dependency_plugin_output=${CI_DIR}/dep.txt - -run_mvn dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} >> "${dependency_plugin_output}" +$MVN dependency:tree -Dincludes=org.scala-lang,:*_2.1*:: ${MAVEN_ARGUMENTS} -T1 > "${dependency_plugin_output}" Review Comment: ```suggestion # dependency:tree needs to run without multi-threading support to ensure that the output stays parseable (the lines appear in a sequential order) $MVN dep
[GitHub] [flink] xiangyuf opened a new pull request, #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…
xiangyuf opened a new pull request, #23231: URL: https://github.com/apache/flink/pull/23231 ## What is the purpose of the change Just as slotmanager.number-of-slots.max, we should also ignore the cpu and memory limitation in standalone mode. ## Brief change log - Add StandaloneResourceManagerFactory#overwriteMaxResourceNumConfig to remove both max slot/cpu/memory limit option. ## Verifying this change This change added tests and can be verified as follows: ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager(StandaloneResourceManagerFactory) - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…
flinkbot commented on PR #23231: URL: https://github.com/apache/flink/pull/23231#issuecomment-1682164857 ## CI report: * 8e42b776f803b8f4dc6b82fd29b04c481da3d62b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #23230: [FLINK-32880][flink-runtime]Fulfill redundant taskmanagers periodical…
KarmaGYZ commented on code in PR #23230: URL: https://github.com/apache/flink/pull/23230#discussion_r1297084113 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocationStrategy.java: ## @@ -48,15 +48,16 @@ ResourceAllocationResult tryFulfillRequirements( BlockedTaskManagerChecker blockedTaskManagerChecker); /** - * Try to make a release decision to release unused PendingTaskManagers and TaskManagers. This - * is more light weighted than {@link #tryFulfillRequirements}, only consider empty registered / - * pending workers and assume all requirements are fulfilled by registered / pending workers. + * Try to make a reconcile decision between release unused PendingTaskManagers/TaskManagers and + * add redundant PendingTaskManagers. This is more light weighted than {@link Review Comment: Try to make a decision to reconcile the cluster resources. ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -161,6 +162,7 @@ public ResourceReleaseResult tryReleaseUnusedResources( List pendingTaskManagersNonUse = new ArrayList<>(); List pendingTaskManagersInuse = new ArrayList<>(); + Review Comment: ```suggestion ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ## @@ -224,7 +224,7 @@ public void start( if (resourceAllocator.isSupported()) { taskManagerReleasableCheck = Review Comment: ```suggestion clusterReconciliationCheck = ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ## @@ -224,7 +224,7 @@ public void start( if (resourceAllocator.isSupported()) { taskManagerReleasableCheck = scheduledExecutor.scheduleWithFixedDelay( -() -> mainThreadExecutor.execute(this::tryReleaseUnusedTaskManagers), +() -> mainThreadExecutor.execute(this::checkClusterResource), Review Comment: ```suggestion () -> mainThreadExecutor.execute(this::checkClusterReconciliation), ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ## @@ -812,29 +812,34 @@ public Collection getAllocatedSlotsOf(InstanceID instanceID) { // Internal periodic check methods // - -private void tryReleaseUnusedTaskManagers() { -if (checkTaskManagerReleasable()) { +private void checkClusterResource() { Review Comment: ```suggestion private void checkClusterReconciliation() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
1996fanrui commented on PR #23228: URL: https://github.com/apache/flink/pull/23228#issuecomment-1682167358 Thanks @gaborgsomogyi for the review, CI is green, merging~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui merged pull request #23228: [hotfix][runtime] Remove the repeated createBasePathIfNeeded in the FileSystemBlobStore
1996fanrui merged PR #23228: URL: https://github.com/apache/flink/pull/23228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xiguashu opened a new pull request, #23232: [FLINK-32887][connector/common] ExecutorNotifier#notifyReadyAsync wil…
xiguashu opened a new pull request, #23232: URL: https://github.com/apache/flink/pull/23232 ## What is the purpose of the change To speed up the procedure of source split enumerator discovering and assigning splits, especially in the case when the 'callable' execution duration (usually for calculating splits) is much longer than the schedule period. Because in this case, 'callable' tasks (to discover tasks) will be continuously created and queued in the executor, and will block the following tasks (to assign splits). ## Brief change log Change ExecutorNotifier#notifyReadyAsync implementation from 'scheduleAtFixedRate' to 'scheduleWithFixedDelay'. ## Verifying this change This change is a trivial rework. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32887) SourceCoordinatorContext#workerExecutor may cause task initializing slowly
[ https://issues.apache.org/jira/browse/FLINK-32887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-32887: --- Labels: pull-request-available (was: ) > SourceCoordinatorContext#workerExecutor may cause task initializing slowly > --- > > Key: FLINK-32887 > URL: https://issues.apache.org/jira/browse/FLINK-32887 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common, Runtime / Coordination >Affects Versions: 1.15.2 >Reporter: liang jie >Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > SourceCoordinatorContext#workerExecutor is typically used to calculate > partitions of a source task and is implemented by a ScheduledExecutorService > with only 1 core (hard coded).Tasks to calculate partitions with be executed > through the function 'workerExecutor.scheduleAtFixedRate'. > --- > In some case, for example, 'getSubscribedPartitions' method will take quite a > long time(e.g. 5min) because of lots of topics are included in the same task > or requests to outer systems timeout etc. And partitionDiscoveryInterval is > set to a short intervel e.g. 1min. > In this case, 'getSubscribedPartitions' runnable tasks will be triggered > repeatedly and be queued in the queue of workerExecutor, during the first > 'getSubscribedPartitions' task running duration, which causing > 'checkPartitionChanges' tasks will be queued too. Each > 'checkPartitionChanges' task needs to wait for 25mins(5 * > 'getSubscribedPartitions' task execution duration) before it was executed. > --- > In my view, tasks of workerExecutor should be scheduled with fix deley rather > than at fixed rate. Because there is no meaning that > 'getSubscribedPartitions' tasks being repeatedly executed without a > 'checkPartitionChanges' execution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] KarmaGYZ commented on a diff in pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…
KarmaGYZ commented on code in PR #23231: URL: https://github.com/apache/flink/pull/23231#discussion_r1297125240 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java: ## @@ -109,17 +109,35 @@ protected ResourceManager createResourceManager( * @return the configuration for standalone ResourceManager */ @VisibleForTesting -public static Configuration getConfigurationWithoutMaxSlotNumberIfSet( +public static Configuration getConfigurationWithoutMaxResourceNumIfSet( Review Comment: ```suggestion public static Configuration getConfigurationWithoutMaxResourceIfSet( ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java: ## @@ -109,17 +109,35 @@ protected ResourceManager createResourceManager( * @return the configuration for standalone ResourceManager */ @VisibleForTesting -public static Configuration getConfigurationWithoutMaxSlotNumberIfSet( +public static Configuration getConfigurationWithoutMaxResourceNumIfSet( Configuration configuration) { final Configuration copiedConfig = new Configuration(configuration); -// The max slot limit should not take effect for standalone cluster, we overwrite the +overwriteMaxResourceNumConfig(copiedConfig); + +return copiedConfig; +} + +private static void overwriteMaxResourceNumConfig(Configuration configuration) { Review Comment: ```suggestion private static void removeMaxResourceConfig(Configuration configuration) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #23232: [FLINK-32887][connector/common] ExecutorNotifier#notifyReadyAsync wil…
flinkbot commented on PR #23232: URL: https://github.com/apache/flink/pull/23232#issuecomment-1682178223 ## CI report: * 9a0a0d8b9d5ce7cc0bfcfe5281ed6bec14ccf638 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #23195: [FLINK-32834] Ease local use of tools/ci/compile.sh
zentol commented on code in PR #23195: URL: https://github.com/apache/flink/pull/23195#discussion_r1297137606 ## tools/ci/compile.sh: ## @@ -69,34 +80,38 @@ fi echo " Checking Javadocs " +javadoc_output=/tmp/javadoc.out + # use the same invocation as .github/workflows/docs.sh -run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ +$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ Review Comment: javadoc can run in parallel. ## tools/ci/compile.sh: ## @@ -69,34 +80,38 @@ fi echo " Checking Javadocs " +javadoc_output=/tmp/javadoc.out + # use the same invocation as .github/workflows/docs.sh -run_mvn javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ +$MVN javadoc:aggregate -DadditionalJOption='-Xdoclint:none' \ -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -Drat.skip=true \ - -Dheader=someTestHeader > javadoc.out + -Dheader=someTestHeader > ${javadoc_output} EXIT_CODE=$? if [ $EXIT_CODE != 0 ] ; then echo "ERROR in Javadocs. Printing full output:" - cat javadoc.out ; rm javadoc.out + cat ${javadoc_output} exit $EXIT_CODE fi echo " Checking Scaladocs " -run_mvn scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -pl flink-scala 2> scaladoc.out +scaladoc_output=/tmp/scaladoc.out + +$MVN scala:doc -Dcheckstyle.skip=true -Denforcer.skip=true -Dspotless.skip=true -pl flink-scala 2> ${scaladoc_output} Review Comment: scaladoc can run in parallel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lincoln-lil commented on a diff in pull request #23209: [FLINK-32824] Port Calcite's fix for the sql like operator
lincoln-lil commented on code in PR #23209: URL: https://github.com/apache/flink/pull/23209#discussion_r1296990792 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala: ## @@ -793,4 +794,29 @@ class CalcITCase extends StreamingTestBase { val expected = List("2.0", "2.0", "2.0") assertEquals(expected.sorted, sink.getAppendResults.sorted) } + + @Test + def testFilterOnString(): Unit = { Review Comment: nit: we can simply extend the current `org.apache.flink.table.planner.runtime.batch.sql.CalcITCase#testFilterOnString` test case adding a new `checkResult` clause to cover the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)
[ https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755529#comment-17755529 ] Hongshun Wang commented on FLINK-32668: --- Thanks a lot, I get it. I will modify this as soon as possible. > fix up watchdog timeout error msg in common.sh(e2e test) > -- > > Key: FLINK-32668 > URL: https://issues.apache.org/jira/browse/FLINK-32668 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Attachments: image-2023-07-25-15-27-37-441.png > > > When run e2e test, an error like this occrurs: > !image-2023-07-25-15-27-37-441.png|width=733,height=115! > > The corresponding code: > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > } > internal_run_with_timeout() { > local timeout_in_seconds="$1" > local on_failure="$2" > local command_label="$3" > local command="${@:4}" > on_exit kill_test_watchdog > ( > command_pid=$BASHPID > (sleep "${timeout_in_seconds}" # set a timeout for this command > echo "${command_label:-"The command '${command}'"} (pid: > $command_pid) did not finish after $timeout_in_seconds seconds." > eval "${on_failure}" > kill "$command_pid") & watchdog_pid=$! > echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid > # invoke > $command > ) > }{code} > > When {{$command}} completes before the timeout, the watchdog process is > killed successfully. However, when {{$command}} times out, the watchdog > process kills {{$command}} and then exits itself, leaving behind an error > message when trying to kill its own process ID with {{{}kill > $watchdog_pid{}}}.This error msg "no such process" is hard to understand. > > So, I will modify like this with better error message: > > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > if kill -0 $watchdog_pid > /dev/null 2>&1; then > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > else > echo "[ERROR] Test is timeout" > exit 1 > fi > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-32668) fix up watchdog timeout error msg in common.sh(e2e test)
[ https://issues.apache.org/jira/browse/FLINK-32668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755529#comment-17755529 ] Hongshun Wang edited comment on FLINK-32668 at 8/17/23 12:35 PM: - I get it.Thanks a lot, [~mapohl] .I will modify this as soon as possible. was (Author: JIRAUSER298968): Thanks a lot, I get it. I will modify this as soon as possible. > fix up watchdog timeout error msg in common.sh(e2e test) > -- > > Key: FLINK-32668 > URL: https://issues.apache.org/jira/browse/FLINK-32668 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Attachments: image-2023-07-25-15-27-37-441.png > > > When run e2e test, an error like this occrurs: > !image-2023-07-25-15-27-37-441.png|width=733,height=115! > > The corresponding code: > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > } > internal_run_with_timeout() { > local timeout_in_seconds="$1" > local on_failure="$2" > local command_label="$3" > local command="${@:4}" > on_exit kill_test_watchdog > ( > command_pid=$BASHPID > (sleep "${timeout_in_seconds}" # set a timeout for this command > echo "${command_label:-"The command '${command}'"} (pid: > $command_pid) did not finish after $timeout_in_seconds seconds." > eval "${on_failure}" > kill "$command_pid") & watchdog_pid=$! > echo $watchdog_pid > $TEST_DATA_DIR/job_watchdog.pid > # invoke > $command > ) > }{code} > > When {{$command}} completes before the timeout, the watchdog process is > killed successfully. However, when {{$command}} times out, the watchdog > process kills {{$command}} and then exits itself, leaving behind an error > message when trying to kill its own process ID with {{{}kill > $watchdog_pid{}}}.This error msg "no such process" is hard to understand. > > So, I will modify like this with better error message: > > {code:java} > kill_test_watchdog() { > local watchdog_pid=$(cat $TEST_DATA_DIR/job_watchdog.pid) > if kill -0 $watchdog_pid > /dev/null 2>&1; then > echo "Stopping job timeout watchdog (with pid=$watchdog_pid)" > kill $watchdog_pid > else > echo "[ERROR] Test is timeout" > exit 1 > fi > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28171) Adjust Job and Task manager port definitions to work with Istio+mTLS
[ https://issues.apache.org/jira/browse/FLINK-28171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755531#comment-17755531 ] sigalit commented on FLINK-28171: - pull-request-available > Adjust Job and Task manager port definitions to work with Istio+mTLS > > > Key: FLINK-28171 > URL: https://issues.apache.org/jira/browse/FLINK-28171 > Project: Flink > Issue Type: Improvement > Components: Deployment / Kubernetes >Affects Versions: 1.14.4 > Environment: flink-kubernetes-operator 1.0.0 > Flink 1.14-java11 > Kubernetes v1.19.5 > Istio 1.7.6 >Reporter: Moshe Elisha >Assignee: Moshe Elisha >Priority: Major > Labels: pull-request-available, stale-assigned > > Hello, > > We are launching Flink deployments using the [Flink Kubernetes > Operator|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/] > on a Kubernetes cluster with Istio and mTLS enabled. > > We found that the TaskManager is unable to communicate with the JobManager on > the jobmanager-rpc port: > > {{2022-06-15 15:25:40,508 WARN akka.remote.ReliableDeliverySupervisor > [] - Association with remote system > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123] > has failed, address is now gated for [50] ms. Reason: [Association failed > with > [akka.tcp://[flink@amf-events-to-inference-and-central.nwdaf-edge|mailto:flink@amf-events-to-inference-and-central.nwdaf-edge]:6123]] > Caused by: [The remote system explicitly disassociated (reason unknown).]}} > > The reason for the issue is that the JobManager service port definitions are > not following the Istio guidelines > [https://istio.io/latest/docs/ops/configuration/traffic-management/protocol-selection/] > (see example below). > > There was also an email discussion around this topic in the users mailing > group under the subject "Flink Kubernetes Operator with K8S + Istio + mTLS - > port definitions". > With the help of the community, we were able to work around the issue but it > was very hard and forced us to skip Istio proxy which is not ideal. > > We would like you to consider changing the default port definitions, either > # Rename the ports – I understand it is Istio specific guideline but maybe > it is better to at least be aligned with one (popular) vendor guideline > instead of none at all. > # Add the “appProtocol” property[1] that is not specific to any vendor but > requires Kubernetes >= 1.19 where it was introduced as beta and moved to > stable in >= 1.20. The option to add appProtocol property was added only in > [https://github.com/fabric8io/kubernetes-client/releases/tag/v5.10.0] with > [#3570|https://github.com/fabric8io/kubernetes-client/issues/3570]. > # Or allow a way to override the defaults. > > [https://kubernetes.io/docs/concepts/services-networking/_print/#application-protocol] > > > {{# k get service inference-results-to-analytics-engine -o yaml}} > {{apiVersion: v1}} > {{kind: Service}} > {{...}} > {{spec:}} > {{ clusterIP: None}} > {{ ports:}} > {{ - name: jobmanager-rpc *# should start with “tcp-“ or add "appProtocol" > property*}} > {{ port: 6123}} > {{ protocol: TCP}} > {{ targetPort: 6123}} > {{ - name: blobserver *# should start with "tcp-" or add "appProtocol" > property*}} > {{ port: 6124}} > {{ protocol: TCP}} > {{ targetPort: 6124}} > {{...}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
XComp commented on code in PR #23229: URL: https://github.com/apache/flink/pull/23229#discussion_r1297168003 ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadExtension.java: ## @@ -119,6 +120,12 @@ public void before(ExtensionContext context) throws Exception { } file2 = TempDirUtils.newFile(tmpDirectory); Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET)); +file3 = TempDirUtils.newFile(tmpDirectory); +try (RandomAccessFile rw = new RandomAccessFile(file3, "rw")) { +// magic value that reliably reproduced EndOfDataDecoderException in hasNext() +rw.setLength(1410); Review Comment: The test doesn't fail if I revert the fix. That makes me believe that this magic value doesn't reproduce the scenario reliably. How did you come up with the test scenario? :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] xiangyuf commented on pull request #23231: [FLINK-32880][flink-runtime]Ignore slotmanager.max-total-resource.cpu…
xiangyuf commented on PR #23231: URL: https://github.com/apache/flink/pull/23231#issuecomment-1682223131 Hi @KarmaGYZ , all comments have been resolved, thx again for reviewing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-32821) Streaming examples failed to execute due to error in packaging
[ https://issues.apache.org/jira/browse/FLINK-32821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755534#comment-17755534 ] Alexander Fedulov commented on FLINK-32821: --- [~Zhanghao Chen] I am still not sure why you faced this issue. Bundling datagen was part of the original PR: [https://github.com/apache/flink/pull/23079/files#diff-c7da2a9d0258717dbd97328195d580aedc3fe51240aeb4fc0f292d946dcdb4af] Maybe there was some unlucky timing because Leonard might have merged the commits separately and maybe you pulled right before this one got integrated [https://github.com/apache/flink/commit/dfb9cb851dc1f0908ea6c3ce1230dd8ca2b48733] Anyhow, I believe we can close this issue. > Streaming examples failed to execute due to error in packaging > -- > > Key: FLINK-32821 > URL: https://issues.apache.org/jira/browse/FLINK-32821 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.18.0 >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > > 7 out of the 8 streaming examples failed to run: > * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed > to run due to java.lang.NoClassDefFoundError: > org/apache/flink/streaming/examples/utils/ParameterTool > * MatrixVectorMul & TopSpeedWindowing & StateMachineExample failed to run > due to: Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.datagen.source.GeneratorFunction > The NoClassDefFoundError with ParameterTool is introduced by FLINK-32558 > Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better > resolve FLINK-32820 ParameterTool is mistakenly marked as deprecated - ASF > JIRA (apache.org) first before we come to a fix for this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
XComp commented on code in PR #23229: URL: https://github.com/apache/flink/pull/23229#discussion_r1297168003 ## flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadExtension.java: ## @@ -119,6 +120,12 @@ public void before(ExtensionContext context) throws Exception { } file2 = TempDirUtils.newFile(tmpDirectory); Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET)); +file3 = TempDirUtils.newFile(tmpDirectory); +try (RandomAccessFile rw = new RandomAccessFile(file3, "rw")) { +// magic value that reliably reproduced EndOfDataDecoderException in hasNext() +rw.setLength(1410); Review Comment: The test doesn't fail on my machine if I revert the fix. That makes me believe that this magic value doesn't reproduce the scenario reliably. How did you come up with the test scenario? :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between
Nicolas Fraison created FLINK-32890: --- Summary: Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between Key: FLINK-32890 URL: https://issues.apache.org/jira/browse/FLINK-32890 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Nicolas Fraison Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * {{Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.}} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader * The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between
[ https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Fraison updated FLINK-32890: Description: Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator {code:java} Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.{code} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) {code:java} (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader {code} The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state was: Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).
[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between
[ https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Fraison updated FLINK-32890: Issue Type: Bug (was: Improvement) > Flink app rolled back with old savepoints (3 hours back in time) while some > checkpoints have been taken in between > -- > > Key: FLINK-32890 > URL: https://issues.apache.org/jira/browse/FLINK-32890 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Nicolas Fraison >Priority: Major > > Here are all details about the issue (based on an app/scenario reproducing > the issue): * Deployed new release of a flink app with a new operator > * Flink Operator set the app as stable > * After some time the app failed and stay in failed state (due to some issue > with our kafka clusters) > * Sadly the team in charge of this flink app decided to rollback the app as > they were thinking it was linked to this new deployment > * Operator detect: {{Job is not running but HA metadata is available for > last state restore, ready for upgrade, Deleting JobManager deployment while > preserving HA metadata.}} -> rely on last-state (as we do not disable > fallback), no savepoint taken > * Flink start JM and deployment of the app. It well find the 3 checkpoints > * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as > Zookeeper namespace.}} > * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} > * {{Recovering checkpoints from > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} > * {{Found 3 checkpoints in > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} > * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ > 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at > }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} > * Job failed because of the missing operator > {code:java} > Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: There is no operator for the state > f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has > been registered for cleanup in the JobResultStore after reaching a terminal > state.{code} > * {{Clean up the high availability data for job > 6b24a364c1905e924a69f3dbff0d26a6.}} > * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from > ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} > * JobManager restart and try to resubmit the job but the job was already > submitted so finished > * {{Received JobGraph submission 'flink-kafka-job' > (6b24a364c1905e924a69f3dbff0d26a6).}} > * {{Ignoring JobGraph submission 'flink-kafka-job' > (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a > globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous > execution.}} > * {{Application completed SUCCESSFULLY}} > * Finally the operator rollback the deployment and still indicate that {{Job > is not running but HA metadata is available for last state restore, ready for > upgrade}} > * But the job metadata are not anymore there (clean previously) > > {code:java} > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints > Path > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints > doesn't exist > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs > (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls > /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico > jobgraphs > jobs > leader > {code} > > The rolled back app from flink operator finally take the last provided > savepoint as no metadata/checkpoints are available. But this last savepoint > is an old one as during the upgrade the operator decided to rely on last-state -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] 1996fanrui commented on a diff in pull request #23198: [FLINK-32856][JUnit5 Migration] Migrate the testutils, throughput and throwable packages of flink-runtime module to junit5
1996fanrui commented on code in PR #23198: URL: https://github.com/apache/flink/pull/23198#discussion_r1297192247 ## flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java: ## @@ -316,18 +317,16 @@ public static void waitForMarkerFile(File file, long timeoutMillis) Thread.sleep(10); } -if (!exists) { -fail("The marker file was not found within " + timeoutMillis + " msecs"); -} +assertThat(exists) +.as("The marker file was not found within " + timeoutMillis + " msecs") Review Comment: Thanks @X-czh 's review, updated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
[ https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32880: --- Affects Version/s: 1.18.0 > Redundant taskManagers should always be fulfilled in FineGrainedSlotManager > --- > > Key: FLINK-32880 > URL: https://issues.apache.org/jira/browse/FLINK-32880 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Major > Labels: pull-request-available > > Currently, if you are using FineGrainedSlotManager, when a redundant > taskmanager exit abnormally, no extra taskmanager will be replenished during > the > periodical check in FineGrainedSlotManager. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32880) Redundant taskManagers should always be fulfilled in FineGrainedSlotManager
[ https://issues.apache.org/jira/browse/FLINK-32880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yangze Guo updated FLINK-32880: --- Priority: Critical (was: Major) > Redundant taskManagers should always be fulfilled in FineGrainedSlotManager > --- > > Key: FLINK-32880 > URL: https://issues.apache.org/jira/browse/FLINK-32880 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.18.0 >Reporter: xiangyu feng >Assignee: xiangyu feng >Priority: Critical > Labels: pull-request-available > > Currently, if you are using FineGrainedSlotManager, when a redundant > taskmanager exit abnormally, no extra taskmanager will be replenished during > the > periodical check in FineGrainedSlotManager. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-32821) Streaming examples failed to execute due to error in packaging
[ https://issues.apache.org/jira/browse/FLINK-32821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17755548#comment-17755548 ] Weihua Hu commented on FLINK-32821: --- Hi, [~afedulov] Thanks for you attention. the change in "[https://github.com/apache/flink/pull/23079/files#diff-c7da2a9d0258717dbd97328195d580aedc3fe51240aeb4fc0f292d946dcdb4af]"; only bundle datagen connector to flink-example-streaming.jar, but it is not the final jar we used. You can check the final jar in target/TopSpeedWindowing.jar, there is no datagen related classes in it. to reproduce this problem, we need a standalone cluster, and use './bin/flink run examples/streaming/TopSpeedWindowing.jar' to run this example This issue could not reproduce by IDE, because IDE will add the dependency to classpathes automaticlly > Streaming examples failed to execute due to error in packaging > -- > > Key: FLINK-32821 > URL: https://issues.apache.org/jira/browse/FLINK-32821 > Project: Flink > Issue Type: Bug > Components: Examples >Affects Versions: 1.18.0 >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > Labels: pull-request-available > > 7 out of the 8 streaming examples failed to run: > * Iteration & SessionWindowing & SocketWindowWordCount & WindowJoin failed > to run due to java.lang.NoClassDefFoundError: > org/apache/flink/streaming/examples/utils/ParameterTool > * MatrixVectorMul & TopSpeedWindowing & StateMachineExample failed to run > due to: Caused by: java.lang.ClassNotFoundException: > org.apache.flink.connector.datagen.source.GeneratorFunction > The NoClassDefFoundError with ParameterTool is introduced by FLINK-32558 > Properly deprecate DataSet API - ASF JIRA (apache.org), and we'd better > resolve FLINK-32820 ParameterTool is mistakenly marked as deprecated - ASF > JIRA (apache.org) first before we come to a fix for this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between
[ https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Fraison updated FLINK-32890: Description: Here are all details about the issue: * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator {code:java} Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.{code} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) {code:java} (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader {code} The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state was: Here are all details about the issue (based on an app/scenario reproducing the issue): * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperSta
[jira] [Updated] (FLINK-32890) Flink app rolled back with old savepoints (3 hours back in time) while some checkpoints have been taken in between
[ https://issues.apache.org/jira/browse/FLINK-32890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Fraison updated FLINK-32890: Description: Here are all details about the issue: * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Finally decided to rollback the new release just in case it could be the root cause of the issue on kafka * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{Found 3 checkpoints in ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints'}.}} * {{{}Restoring job 6b24a364c1905e924a69f3dbff0d26a6 from Checkpoint 19 @ 1692268003920 for 6b24a364c1905e924a69f3dbff0d26a6 located at }}\{{{}s3p://.../flink-kafka-job-apache-nico/checkpoints/6b24a364c1905e924a69f3dbff0d26a6/chk-19{}}}{{{}.{}}} * Job failed because of the missing operator {code:java} Job 6b24a364c1905e924a69f3dbff0d26a6 reached terminal state FAILED. org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state f298e8715b4d85e6f965b60e1c848cbe * Job 6b24a364c1905e924a69f3dbff0d26a6 has been registered for cleanup in the JobResultStore after reaching a terminal state.{code} * {{Clean up the high availability data for job 6b24a364c1905e924a69f3dbff0d26a6.}} * {{Removed job graph 6b24a364c1905e924a69f3dbff0d26a6 from ZooKeeperStateHandleStore\{namespace='flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobgraphs'}.}} * JobManager restart and try to resubmit the job but the job was already submitted so finished * {{Received JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Ignoring JobGraph submission 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.}} * {{Application completed SUCCESSFULLY}} * Finally the operator rollback the deployment and still indicate that {{Job is not running but HA metadata is available for last state restore, ready for upgrade}} * But the job metadata are not anymore there (clean previously) {code:java} (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints Path /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs/6b24a364c1905e924a69f3dbff0d26a6/checkpoints doesn't exist (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico/jobs (CONNECTED [zookeeper-data-eng-multi-cloud.zookeeper-flink.svc:2181]) /> ls /flink-kafka-job-apache-nico/flink-kafka-job-apache-nico jobgraphs jobs leader {code} The rolled back app from flink operator finally take the last provided savepoint as no metadata/checkpoints are available. But this last savepoint is an old one as during the upgrade the operator decided to rely on last-state (The old savepoint taken is a scheduled one) was: Here are all details about the issue: * Deployed new release of a flink app with a new operator * Flink Operator set the app as stable * After some time the app failed and stay in failed state (due to some issue with our kafka clusters) * Sadly the team in charge of this flink app decided to rollback the app as they were thinking it was linked to this new deployment * Operator detect: {{Job is not running but HA metadata is available for last state restore, ready for upgrade, Deleting JobManager deployment while preserving HA metadata.}} -> rely on last-state (as we do not disable fallback), no savepoint taken * Flink start JM and deployment of the app. It well find the 3 checkpoints * {{Using '/flink-kafka-job-apache-nico/flink-kafka-job-apache-nico' as Zookeeper namespace.}} * {{Initializing job 'flink-kafka-job' (6b24a364c1905e924a69f3dbff0d26a6).}} * {{Recovering checkpoints from ZooKeeperStateHandleStore\{namespace='f
[jira] [Updated] (FLINK-13698) Rework threading model of CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-13698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-13698: --- Priority: Not a Priority (was: Minor) > Rework threading model of CheckpointCoordinator > --- > > Key: FLINK-13698 > URL: https://issues.apache.org/jira/browse/FLINK-13698 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.10.0 >Reporter: Piotr Nowojski >Priority: Not a Priority > Labels: auto-deprioritized-critical, auto-deprioritized-major, > stale-minor > > Currently {{CheckpointCoordinator}} and {{CheckpointFailureManager}} code is > executed by multiple different threads (mostly {{ioExecutor}}, but not only). > It's causing multiple concurrency issues, for example: > https://issues.apache.org/jira/browse/FLINK-13497 > Proper fix would be to rethink threading model there. At first glance it > doesn't seem that this code should be multi threaded, except of parts doing > the actual IO operations, so it should be possible to run everything in one > single ExecutionGraph's thread and just run asynchronously necessary IO > operations with some feedback loop ("mailbox style"). > I would strongly recommend fixing this issue before adding new features in > the \{{CheckpointCoordinator}} component. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] XComp commented on a diff in pull request #23229: [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
XComp commented on code in PR #23229: URL: https://github.com/apache/flink/pull/23229#discussion_r1297229877 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java: ## @@ -212,6 +212,16 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms } } +private static boolean hasNext(HttpPostRequestDecoder decoder) { +try { +return decoder.hasNext(); +} catch (HttpPostRequestDecoder.EndOfDataDecoderException e) { +// this can occur if the final chuck wasn't empty, but didn't contain any attribute data +// unfortunately the Netty APIs don't give us any way to check this Review Comment: we could maintain the size of `currentHttpPostRequestDecoder.getBodyHttpDatas()` (that's essentially what the `hasNext()`/`next()` methods access and which increases with every `offer(..)` call): Only if the size of the returned List increases we now that there is more data to read. That's the only workaround I could come up with based on what I found from reading through the `HttpPostMultipartRequestDecoder` iteration code and from what I found in https://github.com/netty/netty/issues/7869. Not sure whether that's a nicer approach, though. :thinking: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #248: [FLINK-32704] Supports spilling to disk when feedback channel memory buffer is full
jiangxin369 commented on code in PR #248: URL: https://github.com/apache/flink-ml/pull/248#discussion_r1297235534 ## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/typeinfo/IterationRecordSerializer.java: ## @@ -106,7 +106,7 @@ public int getLength() { @Override public void serialize(IterationRecord record, DataOutputView target) throws IOException { target.writeByte((byte) record.getType().ordinal()); -serializerNumber(record.getEpoch(), target); +target.writeInt(record.getEpoch()); Review Comment: The original `serializerNumber` cannot serialize `Integer.MAX_VALUE + 1`, which is the epoch of the feedback record in the last iteration. In the previous implementation, the feedback records are just passed in memory and don't need to serialize/deserialize, so the bug doesn't appear. I also did a benchmark to serialize 1 billion integers with `serializerNumber` and `DataOutputSerializer#writeInt`, their performances are quite close, so I think we can remove the current `serializerNumber` implementation. ## flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/feedback/SpillableFeedbackQueue.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.operator.feedback; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.InputViewIterator; +import org.apache.flink.runtime.io.disk.SpillingBuffer; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.table.runtime.operators.sort.ListMemorySegmentPool; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * * A queue that can spill the items to disks automatically when the memory buffer is full. + * + * @param The element type. + */ +@Internal +final class SpillableFeedbackQueue { +private final DataOutputSerializer output = new DataOutputSerializer(256); +private final TypeSerializer serializer; +private final IOManager ioManager; +private final MemoryManager memoryManager; +private final int numPages; + +private List segments; +private ListMemorySegmentPool segmentPool; + +private SpillingBuffer target; +private long size = 0L; + +SpillableFeedbackQueue( +IOManager ioManager, +MemoryManager memoryManager, +TypeSerializer serializer, +long inMemoryBufferSize, +long pageSize) +throws MemoryAllocationException { +this.serializer = Objects.requireNonNull(serializer); +this.ioManager = Objects.requireNonNull(ioManager); +this.memoryManager = Objects.requireNonNull(memoryManager); + +this.numPages = (int) (inMemoryBufferSize / pageSize); +resetSpillingBuffer(); +} + +void add(T item) { +try { +output.clear(); +serializer.serialize(item, output); +target.write(output.getSharedBuffer(), 0, output.length()); +size++; +} catch (IOException e) { +throw new IllegalStateException(e); +} +} + +MutableObjectIterator iterate() { +try { +DataInputView input = target.flip(); +return new InputViewIterator<>(input, this.serializer); +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +long size() { +return size; +} + +void reset() throws Exception { +size = 0; +close(); +resetSpillingBuffer(); +} + +void close() throws IOException { +output.clear(); +List toRelease = target.close(); +toRelease.addAll(seg