[ 
https://issues.apache.org/jira/browse/FLINK-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17199296#comment-17199296
 ] 

Andrew.D.lin commented on FLINK-14012:
--------------------------------------

[~aljoscha] [~Daebeom] ,  Hi may I ask which pr or commit fixed this? Can I 
cherry-pick to lower version?

> Failed to start job for consuming Secure Kafka after the job cancel
> -------------------------------------------------------------------
>
>                 Key: FLINK-14012
>                 URL: https://issues.apache.org/jira/browse/FLINK-14012
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>         Environment: * Kubernetes 1.13.2
>  * Flink 1.9.0
>  * Kafka client libary 2.2.0
>            Reporter: Daebeom Lee
>            Priority: Minor
>
> Hello, this is Daebeom Lee.
> h2. Background
> I installed Flink 1.9.0 at this our Kubernetes cluster.
> We use Flink session cluster. - build fatJar file and upload it at the UI, 
> run serval jobs.
> At first, our jobs are good to start.
> But, when we cancel some jobs, the job failed
> This is the error code.
> {code:java}
> // code placeholder
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/security/scram/internals/ScramSaslClient
>     at 
> org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
>     at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
>     at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
>     at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:168)
>     at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
>     at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
>     at 
> org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:140)
>     at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
>     at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
>     at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
>     at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
>     at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
>     at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
>     at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
>     at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Our workaround
>  * I think that this is Flink JVM classloader issue.
>  * Classloader unloads when job cancels by the way kafka client library is 
> included fatJar.
>  * So, I located Kafka client library to /opt/flink/lib 
>  ** /opt/flink/lib/kafka-clients-2.2.0.jar
>  * And then all issue solved.
>  * But there are weird points
>  ** When Flink 1.8.1 has no problem before 2 weeks
>  ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred.
>  ** Maybe docker image is changed at docker repository ( 
> [https://github.com/docker-flink/docker-flink ) 
> |https://github.com/docker-flink/docker-flink]
>  
> h2. Suggestion
>  * I'd like to know why this error occurred exactly reason after upgrade 
> 1.9.0.
>  * Does anybody know a better solution in this case?
>  
> Thank you in advance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to