I have nothing like that in the config (flink-conf.yaml).

Just downloaded the software and did bin/start-cluster.sh

Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>:

> I meant in the Flink config of the cluster you are submitting the jobs to.
> Specifically whether classloader.check-leaked-classloader was set to
> false.
>
> On 07/02/2022 10:28, HG wrote:
>
> Hi,
>
> Well I have set :
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setMaxParallelism(5);
> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.enableCheckpointing(500);
>
> On the other hand .setBounded(OffsetsInitializer.latest())
>
> Perhaps that is a bit of a conflict?
> The job should be unbounded anyway.
> When I cancel it  (the unbounded) via the GUI and start it again I do not see 
> the same issue.
>
> So perhaps not very important.
> Regards Hans
>
>
> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ches...@apache.org>:
>
>> Have you set anything beyond the defaults in the Flink configuration?
>>
>> This could just be noise with some Kafka stuff running in the background
>> while Flink is shutting things down (and closing the classloader).
>>
>> On 04/02/2022 15:29, HG wrote:
>>
>> Hi,
>>
>> I am developing my flink application.
>> For start I have built a class that reads events from Kafka and outputs
>> them datastream.print()
>>
>> The job runs every time.
>> But starting with the 2nd time I see this in the standalone session log:
>>
>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>                  [] - Failed to close KafkaClient with type
>> org.apache.kafka.clients.NetworkClient
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/network/Selector$CloseMode
>>         at
>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at
>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.network.Selector$CloseMode
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>> ~[?:?]
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>         ... 6 more
>> 2022-02-04 15:16:30,802 INFO
>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>> closed.
>>
>> Am I doing something wrong?
>>
>> This is basically the gist of the code:
>>
>> KafkaSource<String> source = KafkaSource
>>         .<String>builder()
>>         .setBootstrapServers(brokers)
>> .setGroupId(groupId)
>> .setTopics(kafkaInputTopic)
>>         .setValueOnlyDeserializer(new 
>> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>>         .setBounded(OffsetsInitializer.latest())
>>         .build();
>> //withIdleness.duration()//env.fromSource(source, 
>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka 
>> Source");DataStream<String> ds = env.fromSource(source, 
>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>
>> ds.print();
>>
>>
>>
>

Reply via email to