[ https://issues.apache.org/jira/browse/FLINK-32423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761162#comment-17761162 ]
Guozhen Yang commented on FLINK-32423: -------------------------------------- [~gyfora] The flow is not introduced by the flink-sql-runner-example application actually. It's a flaw introduced by flink kubernetes operator. The setJobIdIfNecessary function call in flink kubernetes operator's code assigned job id, no matter whether the flink application is submitted with HA on or off. So when the flink application consisted of multiple jobs is submitted with HA off, which is allowed according to flink's document, the flink kubernetes operator assigns a fixed job id to the first job. Flink cluster will prohibit next job running in the same application when the first job is submmitted with fixed job id, which conflictes with flink's document. I think is not a minor issue with the flink-sql-runner-example. It's actually a flaw of flink kubernetes operator. > Flink-sql-runner-example application fails if multiple execute() called in > one sql file > --------------------------------------------------------------------------------------- > > Key: FLINK-32423 > URL: https://issues.apache.org/jira/browse/FLINK-32423 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator > Reporter: Guozhen Yang > Priority: Not a Priority > > h2. Summary: > flink-sql-runner-example application fails if multiple execute() called in > one sql file > h2. Background: > We have a series of batch jobs running on a table partitioned by date. The > jobs need to be run sequencially in chronological order. Which means only > after the batch job #1 finishes running 2023-06-01 partition, the batch job > #2 running 2023-06-02 partition starts running. So we loop through dates and > submit multiple jobs in a single application, and the flink application is > deployed in application mode with HA turned off. > According to [flink > document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode], > the Application Mode allows the submission of applications consisting of > multiple jobs, but High-Availability is not supported in these cases. > h2. The problem: > The application consisted of multiple jobs fails when the second job is > executed. > Stack trace is shown as below: > {noformat} > 2023-06-21 03:21:44,720 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) ~[?:?] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) > ~[flink-dist-1.1 > 6.2.jar:1.16.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) > ~[flink-dist > -1.16.2.jar:1.16.2] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) > ~[flink-rpc-a > kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed > cb5a1.jar:1.16.2] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) > ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2 > f4d1edcb5a1.jar:1.16.2] > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2] > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar > :1.16.2] > at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] > at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) [?:?] > at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?] > at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?] > at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?] > Caused by: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > ... 14 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error: Failed to execute sql > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) > ~[flink-dist-1.1 > 6.2.jar:1.16.2] > ... 13 more > Caused by: org.apache.flink.table.api.TableException: Failed to execute sql > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) > ~[flink-dist-1.1 > 6.2.jar:1.16.2] > ... 13 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than > one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) > ~[?:?] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) > ~[flink-table-api-java-uber-1.16.2.jar:1.16.2] > at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) > ~[flink-dist-1.16.2.jar:1.16.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) > ~[flink-dist-1.1 > 6.2.jar:1.16.2] > ... 13 more > {noformat} > h2. How to reproduce: > 1. Start a minikube cluster > 2. Add new script file _two-selects.sql_ to > [examples/flink-sql-runner-example/sql-scripts > folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example]. > > The contents of _two-selects.sql_ is shown as below. > {noformat} > select 1; > select 1; > {noformat} > 3. Follow the > [instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md] > to build the flink-sql-runner-example image. > 4. Use minikube image load command to load the image. > 4. Modify [flinkdep yaml > file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml], > change sepc.job.args to args: > ["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep > yaml file. > 5. The application fails. > h2. Possible reason: > According to [flink-kubernetes-oeprator > document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler], > flink by default generate deterministic jobids based on clusterId. > {quote}Flink by default generates deterministic jobids based on the clusterId > (which is the CR name in our case). This causes checkpoint path conflicts if > the job is ever restarted from an empty state (stateless upgrade). We > therefore generate a random jobid to avoid this. > {quote} > I found flink-kubernetes-operator always set job id when submitting > application. [Corresponding code of setJobIdIfNecessary is > here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37] > But according to [flink's > code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213], > there are two situations. > 1. HA is not activated and job id is not set when submitting application(line > 213 to 217). runApplicationAsync is called with > enforceSingleJobExecution=false. So mult-job execution is viable. > 2. If job id is not set when submitting application(line 218 to 233). Job id > is set based on cluster id. After the job is fixed, runApplicationAsync is > called with enforceSingleJobExecution=true. So multi-job execution is not > viable. > If flink-kubernetes-operator always set job id when submitting application, > condition of situation #1 will never match. So application submitted with > flink-kubernetes-operator cannot execute multiple jobs, even if the > application is deployed in application mode and with HA turned off. -- This message was sent by Atlassian Jira (v8.20.10#820010)