Hi Xiaolong,

>From the exception it seems that the flink session cluster is not
running properly. Can you visit the flink web ui and everything is ok?

Best,
Shammon FY

On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang <xiaolong.w...@smartnews.com>
wrote:

> Hi,
> I've tested the Flink SQL-gateway to run some simple Hive queries but met
> some exceptions.
>
>
> Environment Description:
> Run on : Kubernetes
> Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
> Steps to run:
> 1. Apply a `flinkdeployment` of flink session cluster to flink operator
> ```
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: flink-session-cluster-example
>   namespace: xxx
> spec:
>   image: xxx/flink:1.17-sql-gateway-dev
>   flinkVersion: v1_17
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: "2"
>     pipeline.max-parallelism: "1000"
>     state.backend.type: rocksdb
>     state.backend.incremental: "true"
>     state.checkpoints.dir: xxx
>     execution.checkpointing.interval: 1m
>     execution.checkpointing.timeout: 30m
>     high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>     high-availability.storageDir: xxx
>     akka.framesize: 20971520b
>     execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
>     taskmanager.memory.managed.fraction: "0.2"
>     kubernetes.hadoop.conf.config-map.name: xxx
>   serviceAccount: default
>   podTemplate:
>     apiVersion: v1
>     kind: Pod
>     metadata:
>       name: pod-template
>     spec:
>       serviceAccount: default
>   jobManager:
>     resource:
>       memory: "2048m"
>       cpu: 1
>   taskManager:
>     resource:
>       memory: "4096m"
>       cpu: 1
> ```
> This image has been built with a `hadoop dependency` , an existing `hadoop
> configmap`.
>
> 2. Login to the job-manager pod and run the followings
> `./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`
>
> 3. Start a beeline and connect to the SQL gateway then run a simple Hive
> query
> `select count(1) from simple_demo_output where dt = '2021-08-14';`
>
> 4.The SQL gateway goes wrong with the following logs:
> ```
>
> 2023-07-03 06:27:11,078 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>       [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).
>
> 2023-07-03 06:27:15,092 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>       [] - Successfully submitted job 'collect'
> (4c99c40392cb935d3df94891655d2ce5) to '
> http://flink-session-cluster-example-rest.realtime-streaming:8081'.
>
> 2023-07-03 06:27:15,879 ERROR
> org.apache.flink.table.gateway.service.operation.OperationManager [] -
> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.
>
> org.apache.flink.table.api.TableException: Failed to execute sql
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
>
> at java.lang.Thread.run(Unknown Source) [?:?]
>
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'collect'.
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> ~[flink-table-planner_2.12-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> ... 13 more
>
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
>
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
> ~[?:?]
>
> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
> ~[?:?]
>
> ... 1 more
>
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., <Exception on server side:
>
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a,
> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender
> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient
> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the
> recipient is unreachable. This can either mean that the recipient has been
> terminated or that the remote RpcService is currently not reachable.
>
> at
> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>
> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
> Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>
> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
>
>
> End of exception on server side>]
>
> at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
>
> at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
>
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:88)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:135)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
> ~[?:?]
>
> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
> ~[?:?]
>
> ... 1 more
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
>
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a,
> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender
> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient
> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the
> recipient is unreachable. This can either mean that the recipient has been
> terminated or that the remote RpcService is currently not reachable.
>
> at
> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>
> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
> Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>
> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
>
>
> End of exception on server side>]
>
> at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
> Source) ~[?:?]
>
> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
> ~[?:?]
>
> ... 3 more
>
> command terminated with exit code 137
>
> ```
>
>
> And then the `flinkdeployment` goes wrong and cannot recover itself
>
>
>
>
>
>
>
>

Reply via email to