Unsubscribe

2022-01-13 Thread Jerome Li
Unsubscribe



Best Practice of Using HashSet State

2021-08-05 Thread Jerome Li
Hi,

I am new to Flink and state backend. I find Flink does provide ValueState, 
ListState, and MapState. But it does not provide State object for HashSet. What 
is the best practice of storing HashSet State in Flink? Should we use 
ValueState and set the value to be HashSet? Or should we use ListState and 
implement a wrapper class for serializing and desterilizing the HashSet to List?

Any help would be appreciated!

Best,
Jerome


Re: Kafka Consumer stop consuming data

2021-07-13 Thread Jerome Li
Hi Aeden,

Thanks for getting back.

Do you mean one of the partitions is in idle state and not new watermark 
generated from there and then it stunk all the downsteams and stop consuming 
data from Kafka? I didn’t use watermark in my application through.

I checked that all the Kafka partition has data consistently coming in.

Best wish,
Jerome

From: Aeden Jameson 
Date: Tuesday, July 13, 2021 at 3:22 PM
To: Jerome Li 
Cc: user@flink.apache.org 
Subject: Re: Kafka Consumer stop consuming data
This can happen if you have an idle partition. Are all partitions
receiving data consistently?

On Tue, Jul 13, 2021 at 2:59 PM Jerome Li  wrote:
>
> Hi,
>
>
>
> I got question about Flink Kafka consumer. I am facing the issue that the 
> Kafka consumer somehow stop consuming data from Kafka after start for few 
> minutes or after few hours. While stopping, I checked the backpressure and 
> cpu and memory consumption. It all looks like not data consuming instead of 
> stunk at busy computing. And not notable logs in either JM or TM.
>
>
>
> I want to know what scenarios could cause the Kafak consumer stop consuming?
>
>
>
> Any help would be appreciated!
>
>
>
> Best,
>
> Jerome



--
Cheers,
Aeden

GitHub: 
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Faedenj&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=3R%2Bqlz19tsts3omsEodLOAmkIr0oRA7nE4zCVOSXluc%3D&reserved=0
Linked In: 
https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.linkedin.com%2Fin%2Faedenjameson&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=wML9HQDVNvdTApj1pPxzUqKwxMShpcUq7084Oma28aY%3D&reserved=0
Blah Blah Blah: 
https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.twitter.com%2Fdaliful&data=04%7C01%7Clije%40vmware.com%7Ca688c9c7b3954a8cb5f608d9464cb2ff%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637618117520228221%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Re8zSJsvmx%2BNtkEmP4hf5gAlzzNpdtnSeJlaEi2LTi8%3D&reserved=0


Kafka Consumer stop consuming data

2021-07-13 Thread Jerome Li
Hi,

I got question about Flink Kafka consumer. I am facing the issue that the Kafka 
consumer somehow stop consuming data from Kafka after start for few minutes or 
after few hours. While stopping, I checked the backpressure and cpu and memory 
consumption. It all looks like not data consuming instead of stunk at busy 
computing. And not notable logs in either JM or TM.

I want to know what scenarios could cause the Kafak consumer stop consuming?

Any help would be appreciated!

Best,
Jerome


Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-07-12 Thread Jerome Li
  flinkapp-clusterrolebinding

Labels:   app.kubernetes.io/managed-by=skaffold

  skaffold.dev/run-id=ce4ed24a-9c40-4877-8e77-8e8b5561c5d2

Annotations:  meta.helm.sh/release-name: flinkapp

  meta.helm.sh/release-namespace: default

Role:

  Kind:  ClusterRole

  Name:  edit

Subjects:

  KindName   Namespace

     -

  ServiceAccount  flinkapp  default

The service account is

Name:flinkapp

Namespace:   default

Labels:  app.kubernetes.io/managed-by=skaffold

 skaffold.dev/run-id=ce4ed24a-9c40-4877-8e77-8e8b5561c5d2

Annotations: meta.helm.sh/release-name: flinkapp

 meta.helm.sh/release-namespace: default

Image pull secrets:  

Mountable secrets:   flinkapp-token-mpbd5

Tokens:  flinkapp-token-mpbd5

Events:  

Not sure if I am missing any configuration….
Any help would be appreciated!

Best,
Jerome

From: Yang Wang 
Date: Wednesday, May 26, 2021 at 11:25 PM
To: Jerome Li 
Cc: user@flink.apache.org 
Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes 
Master Node
I think your attached exception has been fixed via FLINK-22597[1]. Could you 
please have a try with the latest version.

Moreover, it is not the desired Flink behavior that TaskManager could not 
retrieve the new JobManager address and re-register successfully. I think you 
need to share
the staled TaskManager logs so that we could move forward the debugging.


[1]. 
https://issues.apache.org/jira/browse/FLINK-22597<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-22597&data=04%7C01%7Clije%40vmware.com%7C51be27c1e2074c7f654f08d920d84118%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637576935463682503%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yuIhmtfXPnb8DN4r2La1UTkE9e4Xh%2BS05omS0J93LGE%3D&reserved=0>

Best,
Yang

Jerome Li mailto:l...@vmware.com>> 于2021年5月27日周四 上午4:54写道:
Hi Yang,

Thanks for getting back to me.

By “restart master node”, I mean do “kubctl get nodes” to find the node’s role 
as master and “ssh” into one of master nodes as ubuntu user. Then run “sudo 
/sbin/reboot -f” to restart the master node.

It looks like The JobManager would cancel the running job and log this after 
that.

2021-05-26 18:28:37,997 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Discarding the 
results produced by task execution 34eb9f5009dc7cf07117e720e7d393de.

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore - Suspending

2021-05-26 18:28:37,999 [INFO] 
org.apache.flink.kubernetes.highavailability.KubernetesCheckpointIDCounter - 
Shutting down.

2021-05-26 18:28:38,000 [INFO] 
org.apache.flink.runtime.executiongraph.ExecutionGraph   - Job 
74fc5c858e50f5efc91db9ee16c17a8c has been suspended.

2021-05-26 18:28:38,007 [INFO] 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending 
SlotPool.

2021-05-26 18:28:38,007 [INFO] org.apache.flink.runtime.jobmaster.JobMaster 
- Close ResourceManager connection 
5bac86fb0b5c984ef429225b8de82cc0: JobManager is no longer the leader..

2021-05-26 18:28:38,019 [INFO] 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl  - JobManager 
runner for job hogger (74fc5c858e50f5efc91db9ee16c17a8c) was granted leadership 
with session id 14b9004a-3807-42e8-ac03-c0d77efe5611 at 
akka.tcp://flink@hoggerflink-jobmanager:6123/user/rpc/jobmanager_2.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,292 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not been started yet. 
Discarding message org.apache.flink.runtime.rpc.messages.RemoteFencedMessage 
until processing is started.

2021-05-26 18:28:38,293 [INFO] 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor - The rpc endpoint 
org.apache.flink.runtime.jobmaster.JobMaster has not bee

Re: Flink Resource Management

2021-06-22 Thread Jerome Li
Thanks for the response! Let’s assume some of the Sink does not have incoming 
message. Would idle Sink hurt the performance to the other instances in the 
same taskmanager? I did explore the CPU and memory usages for the entire 
pipeline. When there are not data flowing, the whole cpu and memory usage went 
down when there were not data coming in. When the data flow resume, cpu and 
memory usage would goes high again.

Best,

From: Chesnay Schepler 
Date: Tuesday, June 22, 2021 at 12:57 PM
To: Jerome Li , user@flink.apache.org 
Subject: Re: Flink Resource Management
No; Flink does not cleanuo idle operators.

On 6/22/2021 9:19 PM, Jerome Li wrote:
Hi and Dear Flink users,

I am new to Flink. My project is using Flink v1.12.4+.

I am curious about how Flink manage the cpu and memory when an instance of a 
ProcessFunction/Sink is idle? Will Flink resource manager take deallocate cpu 
and memory from it? Because I am trying to expand the parallelism of a sink to 
all taskamangers to avoid large network traffic between taskmanagers

Any help would be appreciated!\

Thanks!




Flink Resource Management

2021-06-22 Thread Jerome Li
Hi and Dear Flink users,

I am new to Flink. My project is using Flink v1.12.4+.

I am curious about how Flink manage the cpu and memory when an instance of a 
ProcessFunction/Sink is idle? Will Flink resource manager take deallocate cpu 
and memory from it? Because I am trying to expand the parallelism of a sink to 
all taskamangers to avoid large network traffic between taskmanagers

Any help would be appreciated!\

Thanks!


Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-26 Thread Jerome Li
-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobMasterServices(JobMaster.java:891)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:864)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:381) 
~[flink-dist_2.12-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:419)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:210)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]

   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:100)
 ~[flink-dist_2.12-1.12.2.jar:1.12.2]

   ... 21 more

2021-05-26 18:28:38,310 [INFO] org.apache.flink.runtime.blob.BlobServer 
- Stopped BLOB server at 0.0.0.0:6124

Eventually, it gets back to work but sometime not. Some of the taskmanager not 
cannot identify the jobmanager address. I have to manually restart the staled 
taskmanager.

Is this the desired Flink behaviors? Or is it a bug? Or if I am missing 
something?

Best,
Jerome


From: Yang Wang 
Date: Tuesday, May 25, 2021 at 1:03 AM
To: Jerome Li 
Cc: user@flink.apache.org 
Subject: Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes 
Master Node
By "restart master node", do you mean to restart the K8s master component(e.g. 
APIServer, ETCD, etc.)?

Even though the master components are restarted, the Flink JobManager and 
TaskManager should eventually get to work.
Could you please share the JobManager logs so that we could debug why it 
crashed.


Best,
Yang

Jerome Li mailto:l...@vmware.com>> 于2021年5月25日周二 上午3:43写道:
Hi,

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes 
native as HA.

The HA works well when either jobmanager or taskmanager pod lost or crashes.

But, when I restart master node, jobmanager pod will always crash and restart. 
This results in the entire Flink cluster restart and most of taskmanager pod 
will restart as well.

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug 
should be handle or there is some work around.


Below is my Flink setting
Job-Manager

flink-conf.yaml:



jobmanager.rpc.address: streakerflink-jobmanager



high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker



rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081



state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker



blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 



restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s



jobmanager.memory.process.size: 1768m



parallelism.default: 1



task.cancellation.timeout: 2000



web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log



web.submit.enable: false

Task-Manager

flink-conf.yaml:



jobmanager.rpc.address: streakerflink-jobmanager



high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker



taskmanager.network.bind-policy: ip



taskmanager.data.port: 6121

taskmanager.rpc.port: 6122



restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s



taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1



taskmanager.jvm-exit-on-oom: true



metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 



web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log



task.cancellation.timeout: 2000

Any help will be appreciated!

Thanks,
Jerome


Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-24 Thread Jerome Li
Hi,

I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set Kubernetes 
native as HA.

The HA works well when either jobmanager or taskmanager pod lost or crashes.

But, when I restart master node, jobmanager pod will always crash and restart. 
This results in the entire Flink cluster restart and most of taskmanager pod 
will restart as well.

I didn’t see this issue when using zookeeper as HA. Not sure if this is a bug 
should be handle or there is some work around.


Below is my Flink setting
Job-Manager

flink-conf.yaml:



jobmanager.rpc.address: streakerflink-jobmanager



high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.jobmanager.port: 6123

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker



rest.address: streakerflink-jobmanager

rest.bind-port: 8081

rest.port: 8081



state.checkpoints.dir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker



blob.server.port: 6124

metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 



restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s



jobmanager.memory.process.size: 1768m



parallelism.default: 1



task.cancellation.timeout: 2000



web.log.path: /opt/flink/log/output.log

jobmanager.web.log.path: /opt/flink/log/output.log



web.submit.enable: false

Task-Manager

flink-conf.yaml:



jobmanager.rpc.address: streakerflink-jobmanager



high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory

high-availability.cluster-id: /streaker

high-availability.storageDir: hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink

kubernetes.cluster-id: streaker



taskmanager.network.bind-policy: ip



taskmanager.data.port: 6121

taskmanager.rpc.port: 6122



restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 2147483647

restart-strategy.fixed-delay.delay: 5 s



taskmanager.memory.task.heap.size: 9728m

taskmanager.memory.framework.off-heap.size: 512m

taskmanager.memory.managed.size: 512m

taskmanager.memory.jvm-metaspace.size: 256m

taskmanager.memory.jvm-overhead.max: 3g

taskmanager.memory.jvm-overhead.fraction: 0.035

taskmanager.memory.network.fraction: 0.03

taskmanager.memory.network.max: 3g

taskmanager.numberOfTaskSlots: 1



taskmanager.jvm-exit-on-oom: true



metrics.internal.query-service.port: 6125

metrics.reporters: prom

metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter

metrics.reporter.prom.port: 



web.log.path: /opt/flink/log/output.log

taskmanager.log.path: /opt/flink/log/output.log



task.cancellation.timeout: 2000

Any help will be appreciated!

Thanks,
Jerome