Unsubscribe
Unsubscribe
Best Practice of Using HashSet State
Hi, I am new to Flink and state backend. I find Flink does provide ValueState, ListState, and MapState. But it does not provide State object for HashSet. What is the best practice of storing HashSet State in Flink? Should we use ValueState and set the value to be HashSet? Or should we use ListState and implement a wrapper class for serializing and desterilizing the HashSet to List? Any help would be appreciated! Best, Jerome
Re: Kafka Consumer stop consuming data
Hi Aeden, Thanks for getting back. Do you mean one of the partitions is in idle state and not new watermark generated from there and then it stunk all the downsteams and stop consuming data from Kafka? I didn’t use watermark in my application through. I checked that all the Kafka partition has data consistently coming in. Best wish, Jerome From: Aeden Jameson Date: Tuesday, July 13, 2021 at 3:22 PM To: Jerome Li Cc: user@flink.apache.org Subject: Re: Kafka Consumer stop consuming data This can happen if you have an idle partition. Are all partitions receiving data consistently? On Tue, Jul 13, 2021 at 2:59 PM Jerome Li wrote: > > Hi, > > > > I got question about Flink Kafka consumer. I am facing the issue that the > Kafka consumer somehow stop consuming data from Kafka after start for few > minutes or after few hours. While stopping, I checked the backpressure and > cpu and memory consumption. It all looks like not data consuming instead of > stunk at busy computing. And not notable logs in either JM or TM. > > > > I want to know what scenarios could cause the Kafak consumer stop consuming? > > > > Any help would be appreciated! > > > > Best, > > Jerome -- Cheers, Aeden GitHub: https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Faedenj&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=3R%2Bqlz19tsts3omsEodLOAmkIr0oRA7nE4zCVOSXluc%3D&reserved=0 Linked In: https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fin%2Faedenjameson&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=wML9HQDVNvdTApj1pPxzUqKwxMShpcUq7084Oma28aY%3D&reserved=0 Blah Blah Blah: https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.twitter.com%2Fdaliful&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Re8zSJsvmx%2BNtkEmP4hf5gAlzzNpdtnSeJlaEi2LTi8%3D&reserved=0
Kafka Consumer stop consuming data
Hi, I got question about Flink Kafka consumer. I am facing the issue that the Kafka consumer somehow stop consuming data from Kafka after start for few minutes or after few hours. While stopping, I checked the backpressure and cpu and memory consumption. It all looks like not data consuming instead of stunk at busy computing. And not notable logs in either JM or TM. I want to know what scenarios could cause the Kafak consumer stop consuming? Any help would be appreciated! Best, Jerome
Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node
flinkapp-clusterrolebinding Labels: app.kubernetes.io/managed-by=skaffold skaffold.dev/run-id=ce4ed24a-9c40-4877-8e77-8e8b5561c5d2 Annotations: meta.helm.sh/release-name: flinkapp meta.helm.sh/release-namespace: default Role: Kind: ClusterRole Name: edit Subjects: KindName Namespace - ServiceAccount flinkapp default The service account is Name:flinkapp Namespace: default Labels: app.kubernetes.io/managed-by=skaffold skaffold.dev/run-id=ce4ed24a-9c40-4877-8e77-8e8b5561c5d2 Annotations: meta.helm.sh/release-name: flinkapp meta.helm.sh/release-namespace: default Image pull secrets: Mountable secrets: flinkapp-token-mpbd5 Tokens: flinkapp-token-mpbd5 Events: Not sure if I am missing any configuration…. Any help would be appreciated! Best, Jerome From: Yang Wang Date: Wednesday, May 26, 2021 at 11:25 PM To: Jerome Li Cc: user@flink.apache.org Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node I think your attached exception has been fixed via FLINK-22597[1]. Could you please have a try with the latest version. Moreover, it is not the desired Flink behavior that TaskManager could not retrieve the new JobManager address and re-register successfully. I think you need to share the staled TaskManager logs so that we could move forward the debugging. [1]. https://issues.apache.org/jira/browse/FLINK-22597<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-22597&data=04%7C01%7Clije%40vmware.com%7C51be27c1e2074c7f654f08d920d84118%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637576935463682503%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yuIhmtfXPnb8DN4r2La1UTkE9e4Xh%2BS05omS0J93LGE%3D&reserved=0> Best, Yang Jerome Li mailto:l...@vmware.com>> 于2021年5月27日周四 上午4:54写道: Hi Yang, Thanks for getting back to me. By “restart master node”, I mean do “kubctl get nodes” to find the node’s role as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo /sbin/reboot -f” to restart the master node. It looks like The JobManager would cancel the running job and log this after that. 2021-05-26 18:28:37,997 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph - Discarding the results produced by task execution 34eb9f5009dc7cf07117e720e7d393de. 2021-05-26 18:28:37,999 [INFO] org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending 2021-05-26 18:28:37,999 [INFO] org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - Shutting down. 2021-05-26 18:28:38,000 [INFO] org.apache.flink.runtime.executiongraph.ExecutionGraph - Job 74fc5c858e50f5efc91db9ee16c17a8c has been suspended. 2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool. 2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader.. 2021-05-26 18:28:38,019 [INFO] org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl - JobManager runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started. 2021-05-26 18:28:38,292 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage until processing is started. 2021-05-26 18:28:38,293 [INFO] org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint org.apache.flink.runtime.jobmaster.JobMaster has not bee
Re: Flink Resource Management
Thanks for the response! Let’s assume some of the Sink does not have incoming message. Would idle Sink hurt the performance to the other instances in the same taskmanager? I did explore the CPU and memory usages for the entire pipeline. When there are not data flowing, the whole cpu and memory usage went down when there were not data coming in. When the data flow resume, cpu and memory usage would goes high again. Best, From: Chesnay Schepler Date: Tuesday, June 22, 2021 at 12:57 PM To: Jerome Li , user@flink.apache.org Subject: Re: Flink Resource Management No; Flink does not cleanuo idle operators. On 6/22/2021 9:19 PM, Jerome Li wrote: Hi and Dear Flink users, I am new to Flink. My project is using Flink v1.12.4+. I am curious about how Flink manage the cpu and memory when an instance of a ProcessFunction/Sink is idle? Will Flink resource manager take deallocate cpu and memory from it? Because I am trying to expand the parallelism of a sink to all taskamangers to avoid large network traffic between taskmanagers Any help would be appreciated!\ Thanks!
Flink Resource Management
Hi and Dear Flink users, I am new to Flink. My project is using Flink v1.12.4+. I am curious about how Flink manage the cpu and memory when an instance of a ProcessFunction/Sink is idle? Will Flink resource manager take deallocate cpu and memory from it? Because I am trying to expand the parallelism of a sink to all taskamangers to avoid large network traffic between taskmanagers Any help would be appreciated!\ Thanks!
Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node
-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobMasterServices(JobMaster.java:891) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:864) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:381) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:419) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210) ~[flink-dist_2.12-1.12.2.jar:1.12.2] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100) ~[flink-dist_2.12-1.12.2.jar:1.12.2] ... 21 more 2021-05-26 18:28:38,310 [INFO] org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:6124 Eventually, it gets back to work but sometime not. Some of the taskmanager not cannot identify the jobmanager address. I have to manually restart the staled taskmanager. Is this the desired Flink behaviors? Or is it a bug? Or if I am missing something? Best, Jerome From: Yang Wang Date: Tuesday, May 25, 2021 at 1:03 AM To: Jerome Li Cc: user@flink.apache.org Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node By "restart master node", do you mean to restart the K8s master component(e.g. APIServer, ETCD, etc.)? Even though the master components are restarted, the Flink JobManager and TaskManager should eventually get to work. Could you please share the JobManager logs so that we could debug why it crashed. Best, Yang Jerome Li mailto:l...@vmware.com>> 于2021年5月25日周二 上午3:43写道: Hi, I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA. The HA works well when either jobmanager or taskmanager pod lost or crashes. But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well. I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around. Below is my Flink setting Job-Manager flink-conf.yaml: jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.jobmanager.port: 6123 high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker rest.address: streakerflink-jobmanager rest.bind-port: 8081 rest.port: 8081 state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker blob.server.port: 6124 metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s jobmanager.memory.process.size: 1768m parallelism.default: 1 task.cancellation.timeout: 2000 web.log.path: /opt/flink/log/output.log jobmanager.web.log.path: /opt/flink/log/output.log web.submit.enable: false Task-Manager flink-conf.yaml: jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker taskmanager.network.bind-policy: ip taskmanager.data.port: 6121 taskmanager.rpc.port: 6122 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s taskmanager.memory.task.heap.size: 9728m taskmanager.memory.framework.off-heap.size: 512m taskmanager.memory.managed.size: 512m taskmanager.memory.jvm-metaspace.size: 256m taskmanager.memory.jvm-overhead.max: 3g taskmanager.memory.jvm-overhead.fraction: 0.035 taskmanager.memory.network.fraction: 0.03 taskmanager.memory.network.max: 3g taskmanager.numberOfTaskSlots: 1 taskmanager.jvm-exit-on-oom: true metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: web.log.path: /opt/flink/log/output.log taskmanager.log.path: /opt/flink/log/output.log task.cancellation.timeout: 2000 Any help will be appreciated! Thanks, Jerome
Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node
Hi, I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes native as HA. The HA works well when either jobmanager or taskmanager pod lost or crashes. But, when I restart master node, jobmanager pod will always crash and restart. This results in the entire Flink cluster restart and most of taskmanager pod will restart as well. I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug should be handle or there is some work around. Below is my Flink setting Job-Manager flink-conf.yaml: jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.jobmanager.port: 6123 high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker rest.address: streakerflink-jobmanager rest.bind-port: 8081 rest.port: 8081 state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker blob.server.port: 6124 metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s jobmanager.memory.process.size: 1768m parallelism.default: 1 task.cancellation.timeout: 2000 web.log.path: /opt/flink/log/output.log jobmanager.web.log.path: /opt/flink/log/output.log web.submit.enable: false Task-Manager flink-conf.yaml: jobmanager.rpc.address: streakerflink-jobmanager high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.cluster-id: /streaker high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink kubernetes.cluster-id: streaker taskmanager.network.bind-policy: ip taskmanager.data.port: 6121 taskmanager.rpc.port: 6122 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647 restart-strategy.fixed-delay.delay: 5 s taskmanager.memory.task.heap.size: 9728m taskmanager.memory.framework.off-heap.size: 512m taskmanager.memory.managed.size: 512m taskmanager.memory.jvm-metaspace.size: 256m taskmanager.memory.jvm-overhead.max: 3g taskmanager.memory.jvm-overhead.fraction: 0.035 taskmanager.memory.network.fraction: 0.03 taskmanager.memory.network.max: 3g taskmanager.numberOfTaskSlots: 1 taskmanager.jvm-exit-on-oom: true metrics.internal.query-service.port: 6125 metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: web.log.path: /opt/flink/log/output.log taskmanager.log.path: /opt/flink/log/output.log task.cancellation.timeout: 2000 Any help will be appreciated! Thanks, Jerome