回复: Jobmanager restart after it has been requested to stop

2024-02-04 Thread Liting Liu (litiliu) via user
Thank you, Yang:
  We have found the root cause.
  In the logic of Flink operator, it  calls Flink's rest API to stop this 
job then calls the K8s's API to stop the deployment of Flink jobManager. 
However it took more than one minute for K8s to delete that deployment, so when 
the JM's main contain has been successfully shut down by the REST API, then it 
was restarted by the restart policy, because the pod was still not deleted.   
That's why we observed `Jobmanager restart after it has been requested to stop`

发件人: Yang Wang 
发送时间: 2024年2月2日 17:56
收件人: Liting Liu (litiliu) 
抄送: user 
主题: Re: Jobmanager restart after it has been requested to stop

If you could find the "Deregistering Flink Kubernetes cluster, clusterId" in 
the JobManager log, then it is not the expected behavior.

Having the full logs of JobManager Pod before restarted will help a lot.



Best,
Yang

On Fri, Feb 2, 2024 at 1:26 PM Liting Liu (litiliu) via user 
mailto:user@flink.apache.org>> wrote:
Hi, community:
  I'm running a Flink 1.14.3 job with flink-Kubernetes-operator-1.6.0 on 
the AWS. I found my flink jobmananger container's thread restarted after this 
flinkdeployment has been requested to stop, here is the log of jobmanager:

2024-02-01 21:57:48,977 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application CANCELED:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: CANCELED
  at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$6(ApplicationDispatcherBootstrap.java:353)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_322]
2024-02-01 21:57:48,984 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2024-02-01 21:57:49,103 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2024-02-01 21:57:49,105 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopped 
dispatcher akka.tcp://flink@
2024-02-01 21:57:49,112 
tn="AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1" INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka 
RPC service.
2024-02-01 21:57:49,286 tn="flink-metrics-15" INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2024-02-01 21:57:49,387 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Terminating 
cluster entrypoint process KubernetesApplicationClusterEntrypoint with exit 
code 0.
2024-02-01 21:57:53,828 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2024-02-01 21:57:54,287 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting 
KubernetesApplicationClusterEntrypoint.


I found the JM main container's containerId remains the same, after the JM 
auto-restart.
why did this process start to run after it had been requested to stop?



Jobmanager restart after it has been requested to stop

2024-02-01 Thread Liting Liu (litiliu) via user
Hi, community:
  I'm running a Flink 1.14.3 job with flink-Kubernetes-operator-1.6.0 on 
the AWS. I found my flink jobmananger container's thread restarted after this 
flinkdeployment has been requested to stop, here is the log of jobmanager:

2024-02-01 21:57:48,977 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application CANCELED:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: CANCELED
  at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$6(ApplicationDispatcherBootstrap.java:353)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
~[?:1.8.0_322]
2024-02-01 21:57:48,984 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2024-02-01 21:57:49,103 tn="flink-akka.actor.default-dispatcher-107478" INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2024-02-01 21:57:49,105 tn="flink-akka.actor.default-dispatcher-107484" INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopped 
dispatcher akka.tcp://flink@
2024-02-01 21:57:49,112 
tn="AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1" INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka 
RPC service.
2024-02-01 21:57:49,286 tn="flink-metrics-15" INFO  
akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting shut 
down.
2024-02-01 21:57:49,387 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Terminating 
cluster entrypoint process KubernetesApplicationClusterEntrypoint with exit 
code 0.
2024-02-01 21:57:53,828 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
2024-02-01 21:57:54,287 tn="main" INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting 
KubernetesApplicationClusterEntrypoint.


I found the JM main container's containerId remains the same, after the JM 
auto-restart.
why did this process start to run after it had been requested to stop?



Encounter library registration references a different set of library BLOBs after jobManager restarted

2023-07-13 Thread Liting Liu (litiliu)
Hi, Community.  There was an issue that happened to one of our Flink Streaming 
jobs using 1.14.3 and that job didn't enable JobManager HA.  The issue is after 
the only jobManager pod's flink-main-container restarted,  some of the 
taskManager pods keep throwing the below exception:

INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: 
Flink_Kafka_Source -> Filter -> Flat Map (9/16) 
(3bcddb02cc472fb56be015192a38cf22) switched from DEPLOYING to FAILED on 
test-taskmanager-1-7 @ 172.17.115.150 (dataPort=46850). 
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job: 
old:[p-a9f09d52fa47cb7e8707c6d5dbc48de396ae1ab4-54b6f15240547960e63b5d691a53c32f]
 
new:[p-a9f09d52fa47cb7e8707c6d5dbc48de396ae1ab4-882cb6782baa7adf74a1189c77ccb856]
 at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:416)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:356)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:232)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:637) 
~[flink-dist_2.11-1.14.3.jar:1.14.3] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322]

I can make sure the main jar is identical even after the JM pod's 
flink-main-container restarted.
Could anyone help to explain what that job threw the above exception and what 
can i do to avoid it?



Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Liting Liu (litiliu)
Hi,  I was trying to submit a flink 1.17 job with the flink-kubernetes-operator 
version v1.5.0.
But encountered the below exception:


The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: 
Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16"


I think the flink-operator should have supported flink 1.17, because as 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
 requires,   autoscaler only work well with flink 1.17.





回复: How to specify both the resource limit and resource request for JM/TM in flink-operator

2023-01-12 Thread Liting Liu (litiliu)
Seems i can achieve this by specify the 
"kubernetes.jobmanager.cpu.limit-factor" and  
"kubernetes.taskmanager.cpu.limit-factor" in flink properties.  Those parameter 
are supported since flink 1.15
____________
发件人: Liting Liu (litiliu) 
发送时间: 2023年1月12日 16:28
收件人: user 
主题: How to specify both the resource limit and resource request for JM/TM in 
flink-operator

  Hi, community.  I wonder how can i specify both the resource request and 
limit for JM/TM in the podTemplate using flink-operator?  We have the need to 
set the request resource and limit resource to different value.

For example:
jobManager:
  limits:
cpu: 500m
memory: 500Mi
  requests:
cpu: 1000m
memory: 1024Mi
taskManager:
  limits:
cpu: 500m
memory: 1024Mi
  requests:
cpu: 1000m
memory: 2048Mi





How to specify both the resource limit and resource request for JM/TM in flink-operator

2023-01-12 Thread Liting Liu (litiliu)
  Hi, community.  I wonder how can i specify both the resource request and 
limit for JM/TM in the podTemplate using flink-operator?  We have the need to 
set the request resource and limit resource to different value.

For example:
jobManager:
  limits:
cpu: 500m
memory: 500Mi
  requests:
cpu: 1000m
memory: 1024Mi
taskManager:
  limits:
cpu: 500m
memory: 1024Mi
  requests:
cpu: 1000m
memory: 2048Mi





configMap value error when using flink-operator?

2022-10-25 Thread Liting Liu (litiliu)
hi:
   I'm  trying to deploy a flink job with flink-operaotor. The flink-operator's 
version is 1.2.0. And the yaml i use is here:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.15
  flinkVersion: v1_15
  flinkConfiguration:
  serviceAccount: flink
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless

   But i found in the generated configMap, there was a field named 
"taskmanager.numberOfTaskSlots" was set to 2.  Which is very weird, since that 
field was not defined by user.  And according to flink doc the default value of 
"taskmanager.numberOfTaskSlots" should be 1.


status no clear when deploying batch job with flink-k8s-operator

2022-10-25 Thread Liting Liu (litiliu)
  Hi, I'm deploying a flink batch job with flink-k8s-operator. My 
flink-k8s-operator's version is 1.2.0 and flink's version is 1.14.6.  I found 
after the batch job execute finish, the jobManagerDeploymentStatus field became 
"MISSING" in FlinkDeployment crd. And the error field became "Missing 
JobManager deployment".
Is it better to add more information(or field) to indicate this batch job has 
finished normally?

`
status:
  clusterInfo:
flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00
flink-version: 1.14.6
  error: Missing JobManager deployment
  jobManagerDeploymentStatus: MISSING
  jobStatus:
jobId: 3c5807b038300f46154d72c58f074715
jobName: batch-job-lab-o8yln9
savepointInfo:
  lastPeriodicSavepointTimestamp: 0
  savepointHistory: []
  triggerId: ''
  triggerTimestamp: 0
  triggerType: UNKNOWN
startTime: '181370751'
state: RECONCILING
updateTime: '181379021'



回复: Does kubernetes operator support manually triggering savepoint with canceling the job?

2022-10-19 Thread Liting Liu (litiliu)
hi, Geng:
   I successfully triggered savePoint manually, but the job was still running 
after finish taking savepoint. I expect this job to be deleted, because the 
savepoint has been taken.
  jobStatus:
jobId: 9de925e9d4a67e04ef6279925450907c
jobName: sql-te-lab-s334c9
savepointInfo:
  lastPeriodicSavepointTimestamp: 0
  lastSavepoint:
location: >-
  hdfs://flink/sql-te/savepoint-9de925-b9ead1c58e7b
timeStamp: 1666163606426
triggerType: MANUAL
  savepointHistory:
- location: >-
hdfs://flink/sql-te/savepoint-9de925-b9ead1c58e7b
  timeStamp: 1666163606426
  triggerType: MANUAL
  triggerId: ''
  triggerTimestamp: 0
  triggerType: MANUAL
startTime: '1666161791058'
state: RUNNING


发件人: Geng Biao 
发送时间: 2022年10月4日 13:57
收件人: Liting Liu (litiliu) ; user 
主题: Re: Does kubernetes operator support manually triggering savepoint with 
canceling the job?

Hi liting,

Maybe you can check codes of deleteClusterDeployment. When savepoint is 
finished, the operator will delete the job. Is the job not deleted as expected?

Best,
Bias Geng

获取 Outlook for iOS<https://aka.ms/o0ukef>
________________
发件人: Liting Liu (litiliu) 
发送时间: Tuesday, October 4, 2022 12:53:45 PM
收件人: user 
主题: Does kubernetes operator support manually triggering savepoint with 
canceling the job?

Hello Flink community:
   I want to manually trigger the savepoint with the help of kubernetes 
operator. But seems kubernetes operator hasn't provided an option for whether 
cancling the job when triggering savepoint. Because the  `cancelJob` parameter 
was hard coded to false in latest code 
AbstractFlinkService.java#L299<https://github.com/apache/flink-kubernetes-operator/blob/1f6a75056acae90e9fab182fd076ee6755b35bbb/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L299>.
  Do i have to watch the savepoint finish myself, then cancel this job 
ASAP?  And do we have a plan to support this option?


fail to mount hadoop-config-volume when using flink-k8s-operator

2022-10-12 Thread Liting Liu (litiliu)
Hi, community:
  I'm using flink-k8s-operator v1.2.0 to deploy flink job. And the 
"HADOOP_CONF_DIR" environment variable was setted in the image that i buiilded 
from flink:1.15.  I found the taskmanager pod was trying to mount a volume 
named "hadoop-config-volume" from configMap.  But the configMap with the name 
"hadoop-config-volume" was't created.

Do i need to remove the "HADOOP_CONF_DIR" environment variable in dockerfile?
If yes, what should i do to specify the hadoop conf?



Does kubernetes operator support manually triggering savepoint with canceling the job?

2022-10-03 Thread Liting Liu (litiliu)
Hello Flink community:
   I want to manually trigger the savepoint with the help of kubernetes 
operator. But seems kubernetes operator hasn't provided an option for whether 
cancling the job when triggering savepoint. Because the  `cancelJob` parameter 
was hard coded to false in latest code 
AbstractFlinkService.java#L299.
  Do i have to watch the savepoint finish myself, then cancel this job 
ASAP?  And do we have a plan to support this option?


回复: Re:Re: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-09-01 Thread Liting Liu (litiliu)
Tks for your suggestion xuyang.
I have solved this problem by packing a new 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar with the conflict jar shaded.




发件人: Xuyang 
发送时间: 2022年9月1日 0:27
收件人: user@flink.apache.org 
抄送: luoyu...@alumni.sjtu.edu.cn ; Liting Liu 
(litiliu) 
主题: Re:Re: get NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

Hi, Liu.
It seems that you may use other own jars and thay has the common-lang3 with 
other versions, which may cause the version conflict.
My suggesstion is that you can shade this dependency in your own jars or in 
'flink-table-planner', and the latter may require you to compile flink manually.


--

Best!
Xuyang


在 2022-08-31 20:28:43,"yuxia"  写道:

How do you use `flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar`? Do you use sql 
client ? Do you put it in FLINK_HOME/lib?
If it's for sql client, I think you can remove the jar from  FLINK_HOME/lib, 
but add it in Flink SQL client using `add jar 
'flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar' `, and set 
'org.apache.commons.' the to parent-first[1]

But I think the better way is to relocate the class.
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default

Best regards,
Yuxia

____________
发件人: "Liting Liu (litiliu)" 
收件人: "User" 
发送时间: 星期三, 2022年 8 月 31日 下午 5:14:35
主题: get NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar.
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
at 
org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)

Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class.

What should i do?
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar?




get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-08-31 Thread Liting Liu (litiliu)
Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar.
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String;
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
at java.lang.String.valueOf(String.java:2994)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
at 
org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)

Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class.

What should i do?
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar?



Exception when calculating throughputEMA in 1.14.3

2022-08-22 Thread Liting Liu (litiliu)
Hi, we are using 1.14.3, but got "Time should be non negative" after the job 
has been running for days.
What should i do to get rid of this Exception? Do i have to disable the 
network-debloating feature?
Does it's caused by System.currentTimeMillis doesn't always return a value 
bigger than before?

java.lang.IllegalArgumentException: Time should be non negative
  at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputEMA.calculateThroughput(ThroughputEMA.java:44)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.runtime.throughput.ThroughputCalculator.calculateThroughput(ThroughputCalculator.java:80)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.debloat(StreamTask.java:792)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$4(StreamTask.java:784)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]



Failed to restore from ck, because of KryoException

2022-05-05 Thread Liting Liu (litiliu)
Hi, We are using flink 1.14.3. But when the job try to restart from checkPoint, 
the following exception accour. What's wrong?

And how can i avoid it?


Caused by: TimerException{com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 99, Size: 9

Serialization trace:

webexSiteName (com.cisco.wx2.diagnostic_events.SparkIdentifiers)

identifiers (com.cisco.wx2.diagnostic_events.ServiceEvent)

event (com.cisco.wx2.diagnostic_events.Event)}

... 14 more

Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 99, Size: 9

Serialization trace:

webexSiteName (com.cisco.wx2.diagnostic_events.SparkIdentifiers)

identifiers (com.cisco.wx2.diagnostic_events.ServiceEvent)

event (com.cisco.wx2.diagnostic_events.Event)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)

at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:394)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$100(RocksDBMapState.java:65)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getValue(RocksDBMapState.java:502)

at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$2.next(RocksDBMapState.java:217)

at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)

at scala.collection.IterableLike$class.head(IterableLike.scala:107)

at scala.collection.AbstractIterable.head(Iterable.scala:54)

at 
com.cisco.wx2.flink.functions.UnifiedClientJoinAnalysisWindowFunction$$anonfun$buildAnalysisPipeline$1.apply(UnifiedClientJoinAnalysisWindowFunction.scala:170)

at 
com.cisco.wx2.flink.functions.UnifiedClientJoinAnalysisWindowFunction$$anonfun$buildAnalysisPipeline$1.apply(UnifiedClientJoinAnalysisWindowFunction.scala:170)

at scala.Option.foreach(Option.scala:257)