I think you can safely ignore this warning. It shouldn't cause any harm, but I will file a ticket nonetheless.

On 07/02/2022 12:52, HG wrote:
I have nothing like that in the config (flink-conf.yaml).

Just downloaded the software and did bin/start-cluster.sh

Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>:

    I meant in the Flink config of the cluster you are submitting the
    jobs to.
    Specifically whether classloader.check-leaked-classloader was set
    to false.

    On 07/02/2022 10:28, HG wrote:
    Hi,

    Well I have set :
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setMaxParallelism(5);
    env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
    
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.enableCheckpointing(500);
    On the other hand .setBounded(OffsetsInitializer.latest())
    Perhaps that is a bit of a conflict? The job should be unbounded
    anyway. When I cancel it (the unbounded) via the GUI and start it
    again I do not see the same issue.
    So perhaps not very important. Regards Hans


    Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler
    <ches...@apache.org>:

        Have you set anything beyond the defaults in the Flink
        configuration?

        This could just be noise with some Kafka stuff running in the
        background while Flink is shutting things down (and closing
        the classloader).

        On 04/02/2022 15:29, HG wrote:
        Hi,

        I am developing my flink application.
        For start I have built a class that reads events from Kafka
        and outputs them datastream.print()

        The job runs every time.
        But starting with the 2nd time I see this in the standalone
        session log:

        2022-02-04 15:16:30,801 WARN
         org.apache.kafka.common.utils.Utils                  [] -
        Failed to close KafkaClient with type
        org.apache.kafka.clients.NetworkClient
        java.lang.NoClassDefFoundError:
        org/apache/kafka/common/network/Selector$CloseMode
                at
        org.apache.kafka.common.network.Selector.close(Selector.java:806)
        
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
                at
        org.apache.kafka.common.network.Selector.close(Selector.java:365)
        
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
                at
        org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
        
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
                at
        org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
        
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
                at
        
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
        
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
                at java.lang.Thread.run(Thread.java:829) [?:?]
        Caused by: java.lang.ClassNotFoundException:
        org.apache.kafka.common.network.Selector$CloseMode
                at
        java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        ~[?:?]
                at
        java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
                at
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
        ~[flink-dist_2.12-1.14.2.jar:1.14.2]
                at
        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
        ~[flink-dist_2.12-1.14.2.jar:1.14.2]
                at
        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
        ~[flink-dist_2.12-1.14.2.jar:1.14.2]
                at
        java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
                ... 6 more
        2022-02-04 15:16:30,802 INFO
         org.apache.flink.runtime.source.coordinator.SourceCoordinator
        [] - Source coordinator for source Source: Kafka Source ->
        Sink: Print to Std. Out closed.

        Am I doing something wrong?

        This is basically the gist of the code:

        KafkaSource<String> source = KafkaSource
                 .<String>builder()
                 .setBootstrapServers(brokers)
        .setGroupId(groupId)
        .setTopics(kafkaInputTopic)
                 .setValueOnlyDeserializer(new SimpleStringSchema())
        
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
        .setStartingOffsets(OffsetsInitializer.earliest())
                 .setBounded(OffsetsInitializer.latest())
                 .build();

        //withIdleness.duration() //env.fromSource(source,
        WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String> 
ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");

        ds.print();



Reply via email to