[ https://issues.apache.org/jira/browse/FLINK-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17199296#comment-17199296 ]
Andrew.D.lin commented on FLINK-14012: -------------------------------------- [~aljoscha] [~Daebeom] , Hi may I ask which pr or commit fixed this? Can I cherry-pick to lower version? > Failed to start job for consuming Secure Kafka after the job cancel > ------------------------------------------------------------------- > > Key: FLINK-14012 > URL: https://issues.apache.org/jira/browse/FLINK-14012 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Environment: * Kubernetes 1.13.2 > * Flink 1.9.0 > * Kafka client libary 2.2.0 > Reporter: Daebeom Lee > Priority: Minor > > Hello, this is Daebeom Lee. > h2. Background > I installed Flink 1.9.0 at this our Kubernetes cluster. > We use Flink session cluster. - build fatJar file and upload it at the UI, > run serval jobs. > At first, our jobs are good to start. > But, when we cancel some jobs, the job failed > This is the error code. > {code:java} > // code placeholder > java.lang.NoClassDefFoundError: > org/apache/kafka/common/security/scram/internals/ScramSaslClient > at > org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235) > at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168) > at > org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254) > at > org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202) > at > org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140) > at > org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210) > at > org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334) > at > org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325) > at org.apache.kafka.common.network.Selector.connect(Selector.java:257) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920) > at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292) > at > org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803) > at > org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at java.lang.Thread.run(Thread.java:748) > {code} > h2. Our workaround > * I think that this is Flink JVM classloader issue. > * Classloader unloads when job cancels by the way kafka client library is > included fatJar. > * So, I located Kafka client library to /opt/flink/lib > ** /opt/flink/lib/kafka-clients-2.2.0.jar > * And then all issue solved. > * But there are weird points > ** When Flink 1.8.1 has no problem before 2 weeks > ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred. > ** Maybe docker image is changed at docker repository ( > [https://github.com/docker-flink/docker-flink ) > |https://github.com/docker-flink/docker-flink] > > h2. Suggestion > * I'd like to know why this error occurred exactly reason after upgrade > 1.9.0. > * Does anybody know a better solution in this case? > > Thank you in advance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)