Task Managers having trouble registering after restart

2021-08-17 Thread Kevin Lam
Hi all,

I'm observing an issue sometimes, and it's been hard to reproduce, where
task managers are not able to register with the Flink cluster. We provision
only the number of task managers required to run a given application, and
so the absence of any of the task managers causes the job to enter a crash
loop where it fails to get the required task slots.

The failure occurs after a job has been running for a while, and when there
have been job and task manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're running using the high
availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task managers failing to
re-register.

```
] - Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-restserver-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-resourcemanager-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-dispatcher-leader.
2021-08-16 13:15:10,211 INFO
 org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
- Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
2021-08-16 13:16:26,103 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
2021-08-16 13:16:30,978 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
```

```
2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
   [] - Uncaught exception in thread
'kafka-producer-network-thread |
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
~[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.NetworkClient$1
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 6 more
```

```
connection to [null] failed with java.net.ConnectException: Connection
refused: flink-jobmanager/10.28.65.100:6123
2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink-jobmanager:6123]] Caused by:
[java.net.ConnectException: Connection refused: flink-jobmanager/
10.28.65.100:6123]
2021-08-16 13:14:59,669 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp:

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Arvid Heise
Hi Kevin,

"java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
have been loaded. [1]
If you only see that after a while, it's indicating that there is a
classloader leak. I suspect that this is because of Kafka metrics. There
have been some reports in the past.
You can try to see what happens when you disable the forwarding of the
Kafka metrics with register.consumer.metrics: false [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties

On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:

> Hi all,
>
> I'm observing an issue sometimes, and it's been hard to reproduce, where
> task managers are not able to register with the Flink cluster. We provision
> only the number of task managers required to run a given application, and
> so the absence of any of the task managers causes the job to enter a crash
> loop where it fails to get the required task slots.
>
> The failure occurs after a job has been running for a while, and when
> there have been job and task manager restarts. We run in kubernetes so pod
> disruptions occur from time to time, however we're running using the high
> availability setup [0]
>
> Has anyone encountered this before? Any suggestions?
>
> Below are some error messages pulled from the task managers failing to
> re-register.
>
> ```
> ] - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,112 INFO
>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-restserver-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-resourcemanager-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-dispatcher-leader.
> 2021-08-16 13:15:10,211 INFO
>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
> - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
> 2021-08-16 13:16:26,103 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> 2021-08-16 13:16:30,978 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> ```
>
> ```
> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>  [] - Uncaught exception in thread
> 'kafka-producer-network-thread |
> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
> ~[?:?]
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
> ~[?:?]
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
> ~[?:?]
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.clients.NetworkClient$1
> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.12-1.13.1-shopi

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Thanks Arvid! I will give this a try and report back.

On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:

> Hi Kevin,
>
> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
> have been loaded. [1]
> If you only see that after a while, it's indicating that there is a
> classloader leak. I suspect that this is because of Kafka metrics. There
> have been some reports in the past.
> You can try to see what happens when you disable the forwarding of the
> Kafka metrics with register.consumer.metrics: false [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>
> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>> task managers are not able to register with the Flink cluster. We provision
>> only the number of task managers required to run a given application, and
>> so the absence of any of the task managers causes the job to enter a crash
>> loop where it fails to get the required task slots.
>>
>> The failure occurs after a job has been running for a while, and when
>> there have been job and task manager restarts. We run in kubernetes so pod
>> disruptions occur from time to time, however we're running using the high
>> availability setup [0]
>>
>> Has anyone encountered this before? Any suggestions?
>>
>> Below are some error messages pulled from the task managers failing to
>> re-register.
>>
>> ```
>> ] - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,112 INFO
>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Starting DefaultLeaderElectionService with
>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-restserver-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-resourcemanager-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-dispatcher-leader.
>> 2021-08-16 13:15:10,211 INFO
>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>> - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>> 2021-08-16 13:16:26,103 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> 2021-08-16 13:16:30,978 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> ```
>>
>> ```
>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>  [] - Uncaught exception in thread
>> 'kafka-producer-network-thread |
>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>> at
>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.clients.NetworkClient$1
>> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoade

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Actually, we are using the `FlinkKafkaConsumer` [0] rather than
`KafkaSource`. Is there a way to disable the consumer metrics using
`FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:

> Thanks Arvid! I will give this a try and report back.
>
> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>> have been loaded. [1]
>> If you only see that after a while, it's indicating that there is a
>> classloader leak. I suspect that this is because of Kafka metrics. There
>> have been some reports in the past.
>> You can try to see what happens when you disable the forwarding of the
>> Kafka metrics with register.consumer.metrics: false [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>
>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>>
>>> Hi all,
>>>
>>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>>> task managers are not able to register with the Flink cluster. We provision
>>> only the number of task managers required to run a given application, and
>>> so the absence of any of the task managers causes the job to enter a crash
>>> loop where it fails to get the required task slots.
>>>
>>> The failure occurs after a job has been running for a while, and when
>>> there have been job and task manager restarts. We run in kubernetes so pod
>>> disruptions occur from time to time, however we're running using the high
>>> availability setup [0]
>>>
>>> Has anyone encountered this before? Any suggestions?
>>>
>>> Below are some error messages pulled from the task managers failing to
>>> re-register.
>>>
>>> ```
>>> ] - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,112 INFO
>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>> Starting DefaultLeaderElectionService with
>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-restserver-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-resourcemanager-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-dispatcher-leader.
>>> 2021-08-16 13:15:10,211 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>> 2021-08-16 13:16:26,103 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> 2021-08-16 13:16:30,978 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> ```
>>>
>>> ```
>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>>[] - Uncaught exception in thread
>>> 'kafka-producer-network-thread |
>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>> at
>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>> ~[?:?]
>>> at java.lang.Thread.run(Unknown Source) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:
>>

Re: Task Managers having trouble registering after restart

2021-08-24 Thread Arvid Heise
Hi Kevin,

The metrics are exposed similarly, so I expect the same issues as they come
from Kafka's Consumer API itself.

I'll pull in @Chesnay Schepler  who afaik debugged the
leak a while ago.

On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam  wrote:

> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
> `KafkaSource`. Is there a way to disable the consumer metrics using
> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>
> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:
>
>> Thanks Arvid! I will give this a try and report back.
>>
>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>>
>>> Hi Kevin,
>>>
>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>>> have been loaded. [1]
>>> If you only see that after a while, it's indicating that there is a
>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>> have been some reports in the past.
>>> You can try to see what happens when you disable the forwarding of the
>>> Kafka metrics with register.consumer.metrics: false [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>>>
 Hi all,

 I'm observing an issue sometimes, and it's been hard to reproduce,
 where task managers are not able to register with the Flink cluster. We
 provision only the number of task managers required to run a given
 application, and so the absence of any of the task managers causes the job
 to enter a crash loop where it fails to get the required task slots.

 The failure occurs after a job has been running for a while, and when
 there have been job and task manager restarts. We run in kubernetes so pod
 disruptions occur from time to time, however we're running using the high
 availability setup [0]

 Has anyone encountered this before? Any suggestions?

 Below are some error messages pulled from the task managers failing to
 re-register.

 ```
 ] - Starting DefaultLeaderRetrievalService with
 KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
 2021-08-16 13:15:10,112 INFO
  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
 Starting DefaultLeaderElectionService with
 KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
 2021-08-16 13:15:10,205 INFO
  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
 [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
 streaming-sales-model-staging-restserver-leader.
 2021-08-16 13:15:10,205 INFO
  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
 [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
 streaming-sales-model-staging-resourcemanager-leader.
 2021-08-16 13:15:10,205 INFO
  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
 [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
 streaming-sales-model-staging-dispatcher-leader.
 2021-08-16 13:15:10,211 INFO
  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
 - Starting DefaultLeaderRetrievalService with
 KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
 2021-08-16 13:16:26,103 WARN
  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
 - Error while retrieving the leader gateway. Retrying to connect to
 akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
 2021-08-16 13:16:30,978 WARN
  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
 - Error while retrieving the leader gateway. Retrying to connect to
 akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
 ```

 ```
 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
[] - Uncaught exception in thread
 'kafka-producer-network-thread |
 trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
 java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
 at
 org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
 ~[?:?]
 at
 org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
 ~[?:?]
 at
 org.apache.kafka.clients.NetworkClient.poll

Re: Task Managers having trouble registering after restart

2021-08-24 Thread Kevin Lam
Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've found
it difficult to reproduce easily. I did have follow-up questions:

1/ If Kafka metrics are indeed the cause of the leak, is there a
workaround? We'd be interested in having these metrics available for
monitoring and alerting purposes.

2/ Do you have any tips on identifying/confirming where the leak is coming
from?



On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise  wrote:

> Hi Kevin,
>
> The metrics are exposed similarly, so I expect the same issues as they
> come from Kafka's Consumer API itself.
>
> I'll pull in @Chesnay Schepler  who afaik debugged
> the leak a while ago.
>
> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam  wrote:
>
>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>> `KafkaSource`. Is there a way to disable the consumer metrics using
>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>
>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:
>>
>>> Thanks Arvid! I will give this a try and report back.
>>>
>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>>>
 Hi Kevin,

 "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
 have been loaded. [1]
 If you only see that after a while, it's indicating that there is a
 classloader leak. I suspect that this is because of Kafka metrics. There
 have been some reports in the past.
 You can try to see what happens when you disable the forwarding of the
 Kafka metrics with register.consumer.metrics: false [2].

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties

 On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam 
 wrote:

> Hi all,
>
> I'm observing an issue sometimes, and it's been hard to reproduce,
> where task managers are not able to register with the Flink cluster. We
> provision only the number of task managers required to run a given
> application, and so the absence of any of the task managers causes the job
> to enter a crash loop where it fails to get the required task slots.
>
> The failure occurs after a job has been running for a while, and when
> there have been job and task manager restarts. We run in kubernetes so pod
> disruptions occur from time to time, however we're running using the high
> availability setup [0]
>
> Has anyone encountered this before? Any suggestions?
>
> Below are some error messages pulled from the task managers failing to
> re-register.
>
> ```
> ] - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,112 INFO
>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-restserver-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-resourcemanager-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-dispatcher-leader.
> 2021-08-16 13:15:10,211 INFO
>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
> - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
> 2021-08-16 13:16:26,103 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> 2021-08-16 13:16:30,978 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> ```
>
> ```
> 2021-08-15 14:02:21,078 ERROR
>

Re: Task Managers having trouble registering after restart

2021-08-24 Thread Chesnay Schepler
There's a super rough guide in the wiki: 
https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks


The gist of it is that you first want to verify that a 
ChildFirstClassLoader is being leaked (i.e., run a few jobs, cancel 
them, trigger garbage collection, get heap dump, check that 
ChildFirstClassLoaders are still there), then investigate which GC roots 
(aka threads) are referencing that classloader, then figure out where 
those threads came from.


On 24/08/2021 15:43, Kevin Lam wrote:

Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've 
found it difficult to reproduce easily. I did have follow-up questions:


1/ If Kafka metrics are indeed the cause of the leak, is there a 
workaround? We'd be interested in having these metrics available for 
monitoring and alerting purposes.


2/ Do you have any tips on identifying/confirming where the leak is 
coming from?




On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise > wrote:


Hi Kevin,

The metrics are exposed similarly, so I expect the same issues as
they come from Kafka's Consumer API itself.

I'll pull in @Chesnay Schepler  who
afaik debugged the leak a while ago.

On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam mailto:kevin@shopify.com>> wrote:

Actually, we are using the `FlinkKafkaConsumer` [0] rather
than `KafkaSource`. Is there a way to disable the consumer
metrics using `FlinkKafkaConsumer`? Do you expect that to have
the same Metaspace issue?


[0]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html



On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam
mailto:kevin@shopify.com>> wrote:

Thanks Arvid! I will give this a try and report back.

On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise
mailto:ar...@apache.org>> wrote:

Hi Kevin,

"java.lang.OutOfMemoryError: Metaspace" indicates that
too many classes have been loaded. [1]
If you only see that after a while, it's indicating
that there is a classloader leak. I suspect that this
is because of Kafka metrics. There have been some
reports in the past.
You can try to see what happens when you disable the
forwarding of the Kafka metrics with
|register.consumer.metrics: false [2].|

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code


[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties



On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam
mailto:kevin@shopify.com>>
wrote:

Hi all,

I'm observing an issue sometimes, and it's been
hard to reproduce, where task managers are not
able to register with the Flink cluster. We
provision only the number of task managers
required to run a given application, and so the
absence of any of the task managers causes the job
to enter a crash loop where it fails to get the
required task slots.

The failure occurs after a job has been running
for a while, and when there have been job and task
manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're
running using the high availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task
managers failing to re-register.

```
] - Starting DefaultLeaderRetrievalService with

KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService