Hi, Xiaolong

As Shammon says, I think you should the exception info of Flink cluster
first to confirm the root cause.

Best,
Ron

Shammon FY <zjur...@gmail.com> 于2023年7月4日周二 16:44写道:

> Hi Xiaolong,
>
> I think you may need to check the error log in the flink cluster to find
> out the root cause.
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 3:38 PM Xiaolong Wang <xiaolong.w...@smartnews.com>
> wrote:
>
>> The flink web ui is fine until I run the Hive query. After that the flink
>> deployment is down and the web UI is not accessible.
>>
>> On Tue, Jul 4, 2023 at 9:13 AM Shammon FY <zjur...@gmail.com> wrote:
>>
>>> Hi Xiaolong,
>>>
>>> From the exception it seems that the flink session cluster is not
>>> running properly. Can you visit the flink web ui and everything is ok?
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang <
>>> xiaolong.w...@smartnews.com> wrote:
>>>
>>>> Hi,
>>>> I've tested the Flink SQL-gateway to run some simple Hive queries but
>>>> met some exceptions.
>>>>
>>>>
>>>> Environment Description:
>>>> Run on : Kubernetes
>>>> Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
>>>> Steps to run:
>>>> 1. Apply a `flinkdeployment` of flink session cluster to flink operator
>>>> ```
>>>> apiVersion: flink.apache.org/v1beta1
>>>> kind: FlinkDeployment
>>>> metadata:
>>>>   name: flink-session-cluster-example
>>>>   namespace: xxx
>>>> spec:
>>>>   image: xxx/flink:1.17-sql-gateway-dev
>>>>   flinkVersion: v1_17
>>>>   flinkConfiguration:
>>>>     taskmanager.numberOfTaskSlots: "2"
>>>>     pipeline.max-parallelism: "1000"
>>>>     state.backend.type: rocksdb
>>>>     state.backend.incremental: "true"
>>>>     state.checkpoints.dir: xxx
>>>>     execution.checkpointing.interval: 1m
>>>>     execution.checkpointing.timeout: 30m
>>>>     high-availability:
>>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>>     high-availability.storageDir: xxx
>>>>     akka.framesize: 20971520b
>>>>     execution.checkpointing.externalized-checkpoint-retention:
>>>> RETAIN_ON_CANCELLATION
>>>>     taskmanager.memory.managed.fraction: "0.2"
>>>>     kubernetes.hadoop.conf.config-map.name: xxx
>>>>   serviceAccount: default
>>>>   podTemplate:
>>>>     apiVersion: v1
>>>>     kind: Pod
>>>>     metadata:
>>>>       name: pod-template
>>>>     spec:
>>>>       serviceAccount: default
>>>>   jobManager:
>>>>     resource:
>>>>       memory: "2048m"
>>>>       cpu: 1
>>>>   taskManager:
>>>>     resource:
>>>>       memory: "4096m"
>>>>       cpu: 1
>>>> ```
>>>> This image has been built with a `hadoop dependency` , an existing
>>>> `hadoop configmap`.
>>>>
>>>> 2. Login to the job-manager pod and run the followings
>>>> `./bin/sql-gateway.sh start-foreground
>>>> -Dsql-gateway.endpoint.type=hiveserver2
>>>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`
>>>>
>>>> 3. Start a beeline and connect to the SQL gateway then run a simple
>>>> Hive query
>>>> `select count(1) from simple_demo_output where dt = '2021-08-14';`
>>>>
>>>> 4.The SQL gateway goes wrong with the following logs:
>>>> ```
>>>>
>>>> 2023-07-03 06:27:11,078 INFO  
>>>> org.apache.flink.client.program.rest.RestClusterClient
>>>>       [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).
>>>>
>>>> 2023-07-03 06:27:15,092 INFO  
>>>> org.apache.flink.client.program.rest.RestClusterClient
>>>>       [] - Successfully submitted job 'collect'
>>>> (4c99c40392cb935d3df94891655d2ce5) to '
>>>> http://flink-session-cluster-example-rest.realtime-streaming:8081'.
>>>>
>>>> 2023-07-03 06:27:15,879 ERROR
>>>> org.apache.flink.table.gateway.service.operation.OperationManager [] -
>>>> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.
>>>>
>>>> org.apache.flink.table.api.TableException: Failed to execute sql
>>>>
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
>>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
>>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
>>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
>>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>>>>
>>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>>> [?:?]
>>>>
>>>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>>>>
>>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>>>> [?:?]
>>>>
>>>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>>>>
>>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>>>> [?:?]
>>>>
>>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>>>> [?:?]
>>>>
>>>> at java.lang.Thread.run(Unknown Source) [?:?]
>>>>
>>>> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
>>>> 'collect'.
>>>>
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
>>>> ~[flink-table-planner_2.12-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955)
>>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>>>>
>>>> ... 13 more
>>>>
>>>> Caused by: java.lang.RuntimeException: Error while waiting for job to
>>>> be initialized
>>>>
>>>> at
>>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> ... 1 more
>>>>
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
>>>> error., <Exception on server side:
>>>>
>>>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
>>>> Could not send message
>>>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a,
>>>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender
>>>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient
>>>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the
>>>> recipient is unreachable. This can either mean that the recipient has been
>>>> terminated or that the remote RpcService is currently not reachable.
>>>>
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>>>
>>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>>
>>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>>>
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>>
>>>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>>>>
>>>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>>>
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>>>
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>>>>
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>>>>
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>>>
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>>>
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>>>>
>>>> at
>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
>>>> Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown
>>>> Source)
>>>>
>>>>
>>>> End of exception on server side>]
>>>>
>>>> at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
>>>> ~[?:?]
>>>>
>>>> at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
>>>>
>>>> at
>>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:88)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:135)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> ... 1 more
>>>>
>>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
>>>> [Internal server error., <Exception on server side:
>>>>
>>>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
>>>> Could not send message
>>>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a,
>>>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender
>>>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient
>>>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the
>>>> recipient is unreachable. This can either mean that the recipient has been
>>>> terminated or that the remote RpcService is currently not reachable.
>>>>
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>>>>
>>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>>>>
>>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>>>>
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>>>>
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>>>>
>>>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>>>>
>>>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>>>>
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>>>>
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>>>>
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>>>>
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>>>>
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>>>>
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>>>>
>>>> at
>>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
>>>> Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>>>>
>>>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown
>>>> Source)
>>>>
>>>>
>>>> End of exception on server side>]
>>>>
>>>> at
>>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at
>>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
>>>> ~[flink-dist-1.17.1.jar:1.17.1]
>>>>
>>>> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown
>>>> Source) ~[?:?]
>>>>
>>>> ... 3 more
>>>>
>>>> command terminated with exit code 137
>>>>
>>>> ```
>>>>
>>>>
>>>> And then the `flinkdeployment` goes wrong and cannot recover itself
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to