Super Thanks Op ma 7 feb. 2022 om 13:04 schreef Chesnay Schepler <ches...@apache.org>:
> I think you can safely ignore this warning. It shouldn't cause any harm, > but I will file a ticket nonetheless. > > On 07/02/2022 12:52, HG wrote: > > I have nothing like that in the config (flink-conf.yaml). > > Just downloaded the software and did bin/start-cluster.sh > > Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>: > >> I meant in the Flink config of the cluster you are submitting the jobs to. >> Specifically whether classloader.check-leaked-classloader was set to >> false. >> >> On 07/02/2022 10:28, HG wrote: >> >> Hi, >> >> Well I have set : >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setMaxParallelism(5); >> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*); >> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> env.enableCheckpointing(500); >> >> On the other hand .setBounded(OffsetsInitializer.latest()) >> >> Perhaps that is a bit of a conflict? >> The job should be unbounded anyway. >> When I cancel it (the unbounded) via the GUI and start it again I do not >> see the same issue. >> >> So perhaps not very important. >> Regards Hans >> >> >> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ches...@apache.org>: >> >>> Have you set anything beyond the defaults in the Flink configuration? >>> >>> This could just be noise with some Kafka stuff running in the background >>> while Flink is shutting things down (and closing the classloader). >>> >>> On 04/02/2022 15:29, HG wrote: >>> >>> Hi, >>> >>> I am developing my flink application. >>> For start I have built a class that reads events from Kafka and outputs >>> them datastream.print() >>> >>> The job runs every time. >>> But starting with the 2nd time I see this in the standalone session log: >>> >>> 2022-02-04 15:16:30,801 WARN org.apache.kafka.common.utils.Utils >>> [] - Failed to close KafkaClient with type >>> org.apache.kafka.clients.NetworkClient >>> java.lang.NoClassDefFoundError: >>> org/apache/kafka/common/network/Selector$CloseMode >>> at >>> org.apache.kafka.common.network.Selector.close(Selector.java:806) >>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >>> at >>> org.apache.kafka.common.network.Selector.close(Selector.java:365) >>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >>> at >>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639) >>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >>> at >>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834) >>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >>> at >>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219) >>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?] >>> at java.lang.Thread.run(Thread.java:829) [?:?] >>> Caused by: java.lang.ClassNotFoundException: >>> org.apache.kafka.common.network.Selector$CloseMode >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:476) >>> ~[?:?] >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?] >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >>> at >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?] >>> ... 6 more >>> 2022-02-04 15:16:30,802 INFO >>> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source >>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out >>> closed. >>> >>> Am I doing something wrong? >>> >>> This is basically the gist of the code: >>> >>> KafkaSource<String> source = KafkaSource >>> .<String>builder() >>> .setBootstrapServers(brokers) >>> .setGroupId(groupId) >>> .setTopics(kafkaInputTopic) >>> .setValueOnlyDeserializer(new >>> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest()) >>> .setBounded(OffsetsInitializer.latest()) >>> .build(); >>> //withIdleness.duration()//env.fromSource(source, >>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka >>> Source");DataStream<String> ds = env.fromSource(source, >>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); >>> >>> ds.print(); >>> >>> >>> >> >