Olivier Zembri created FLINK-19132:
--------------------------------------
Summary: Failed to start jobs for consuming Secure Kafka after
cluster restart
Key: FLINK-19132
URL: https://issues.apache.org/jira/browse/FLINK-19132
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.10.1, 1.9.1
Reporter: Olivier Zembri
We deploy Flink jobs packaged as fat jar files compiled with Java 1.8 on a
Flink session cluster in Kubernetes.
After restarting the Kubernetes cluster, the jobs fail to start and we get
several NoClassDefFoundError in the Task Manager log.
*Stack trace*
{color:#7a869a}{color}
{code:java}
java.lang.NoClassDefFoundError:
org.apache.kafka.common.security.scram.ScramSaslClient
{
"class": "org.apache.kafka.common.security.scram.ScramSaslClient",
"method": "evaluateChallenge",
"file": "ScramSaslClient.java",
"line": 128,
},
{
"class":
"org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
"method": "run",
"file": "SaslClientAuthenticator.java",
"line": 280,
},
{
"class":
"org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2",
"method": "run",
"file": "SaslClientAuthenticator.java",
"line": 278,
},
{
"class": "java.security.AccessController",
"method": "doPrivileged",
"file": "AccessController.java",
"line": -2,
},
{
"class": "javax.security.auth.Subject",
"method": "doAs",
"file": "Subject.java",
"line": 422,
},
{
"class":
"org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
"method": "createSaslToken",
"file": "SaslClientAuthenticator.java",
"line": 278,
},
{
"class":
"org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
"method": "sendSaslToken",
"file": "SaslClientAuthenticator.java",
"line": 215,
},
{
"class":
"org.apache.kafka.common.security.authenticator.SaslClientAuthenticator",
"method": "authenticate",
"file": "SaslClientAuthenticator.java",
"line": 189,
},
{
"class": "org.apache.kafka.common.network.KafkaChannel",
"method": "prepare",
"file": "KafkaChannel.java",
"line": 76,
},
{
"class": "org.apache.kafka.common.network.Selector",
"method": "pollSelectionKeys",
"file": "Selector.java",
"line": 376,
},
{
"class": "org.apache.kafka.common.network.Selector",
"method": "poll",
"file": "Selector.java",
"line": 326,
},
{
"class": "org.apache.kafka.clients.NetworkClient",
"method": "poll",
"file": "NetworkClient.java",
"line": 433,
},
{
"class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
"method": "poll",
"file": "ConsumerNetworkClient.java",
"line": 232,
},
{
"class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
"method": "poll",
"file": "ConsumerNetworkClient.java",
"line": 208,
},
{
"class": "org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient",
"method": "poll",
"file": "ConsumerNetworkClient.java",
"line": 184,
},
{
"class": "org.apache.kafka.clients.consumer.internals.Fetcher",
"method": "getTopicMetadata",
"file": "Fetcher.java",
"line": 314,
},
{
"class": "org.apache.kafka.clients.consumer.KafkaConsumer",
"method": "partitionsFor",
"file": "KafkaConsumer.java",
"line": 1386,
},
{
"class":
"org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer",
"method": "getAllPartitionsForTopics",
"file": "Kafka09PartitionDiscoverer.java",
},
{
"class":
"org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer",
"method": "discoverPartitions",
"file": "AbstractPartitionDiscoverer.java",
"line": 131,
},
{
"class": "org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase",
"method": "open",
"file": "FlinkKafkaConsumerBase.java",
"line": 508,
},
{
"class": "org.apache.flink.api.common.functions.util.FunctionUtils",
"method": "openFunction",
"file": "FunctionUtils.java",
"line": 36,
},
{
"class": "org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator",
"method": "open",
"file": "AbstractUdfStreamOperator.java",
"line": 102,
},
{
"class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
"method": "openAllOperators",
"file": "StreamTask.java",
"line": 532,
},
{
"class": "org.apache.flink.streaming.runtime.tasks.StreamTask",
"method": "invoke",
"file": "StreamTask.java",
"line": 396,
},
{
"class": "org.apache.flink.runtime.taskmanager.Task",
"method": "doRun",
"file": "Task.java",
"line": 705,
},
{
"class": "org.apache.flink.runtime.taskmanager.Task",
"method": "run",
"file": "Task.java",
"line": 530,
},
{
"class": "java.lang.Thread",
"method": "run",
"file": "Thread.java",
"line": 748,
}{code}
{color:#7a869a} {color}
*Workaround:*
- Copy the jar file containing the missing classes in the /lib folder
/opt/flink/lib/kafka-clients-0.11.0.jar
- Update the
[flink-conf.yaml|https://github.ibm.com/dba/taiga-flink/blob/master/conf/flink-conf.yaml]
with
{{classloader.parent-first-patterns.additional: org.apache.kafka}}
{{_Note:_ This issue is very similar to
https://issues.apache.org/jira/browse/FLINK-14012.}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)