I have nothing like that in the config (flink-conf.yaml). Just downloaded the software and did bin/start-cluster.sh
Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>: > I meant in the Flink config of the cluster you are submitting the jobs to. > Specifically whether classloader.check-leaked-classloader was set to > false. > > On 07/02/2022 10:28, HG wrote: > > Hi, > > Well I have set : > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setMaxParallelism(5); > env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*); > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.enableCheckpointing(500); > > On the other hand .setBounded(OffsetsInitializer.latest()) > > Perhaps that is a bit of a conflict? > The job should be unbounded anyway. > When I cancel it (the unbounded) via the GUI and start it again I do not see > the same issue. > > So perhaps not very important. > Regards Hans > > > Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ches...@apache.org>: > >> Have you set anything beyond the defaults in the Flink configuration? >> >> This could just be noise with some Kafka stuff running in the background >> while Flink is shutting things down (and closing the classloader). >> >> On 04/02/2022 15:29, HG wrote: >> >> Hi, >> >> I am developing my flink application. >> For start I have built a class that reads events from Kafka and outputs >> them datastream.print() >> >> The job runs every time. >> But starting with the 2nd time I see this in the standalone session log: >> >> 2022-02-04 15:16:30,801 WARN org.apache.kafka.common.utils.Utils >> [] - Failed to close KafkaClient with type >> org.apache.kafka.clients.NetworkClient >> java.lang.NoClassDefFoundError: >> org/apache/kafka/common/network/Selector$CloseMode >> at >> org.apache.kafka.common.network.Selector.close(Selector.java:806) >> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >> at >> org.apache.kafka.common.network.Selector.close(Selector.java:365) >> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >> at >> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) >> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >> at >> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) >> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >> at >> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) >> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >> at java.lang.Thread.run(Thread.java:829) [?:?] >> Caused by: java.lang.ClassNotFoundException: >> org.apache.kafka.common.network.Selector$CloseMode >> at java.net.URLClassLoader.findClass(URLClassLoader.java:476) >> ~[?:?] >> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] >> at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?] >> ... 6 more >> 2022-02-04 15:16:30,802 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source >> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out >> closed. >> >> Am I doing something wrong? >> >> This is basically the gist of the code: >> >> KafkaSource<String> source = KafkaSource >> .<String>builder() >> .setBootstrapServers(brokers) >> .setGroupId(groupId) >> .setTopics(kafkaInputTopic) >> .setValueOnlyDeserializer(new >> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest()) >> .setBounded(OffsetsInitializer.latest()) >> .build(); >> //withIdleness.duration()//env.fromSource(source, >> WatermarkStrategy.forMonotonousTimestamps(), "Kafka >> Source");DataStream<String> ds = env.fromSource(source, >> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); >> >> ds.print(); >> >> >> >