Super
Thanks

Op ma 7 feb. 2022 om 13:04 schreef Chesnay Schepler <ches...@apache.org>:

> I think you can safely ignore this warning. It shouldn't cause any harm,
> but I will file a ticket nonetheless.
>
> On 07/02/2022 12:52, HG wrote:
>
> I have nothing like that in the config (flink-conf.yaml).
>
> Just downloaded the software and did bin/start-cluster.sh
>
> Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>:
>
>> I meant in the Flink config of the cluster you are submitting the jobs to.
>> Specifically whether classloader.check-leaked-classloader was set to
>> false.
>>
>> On 07/02/2022 10:28, HG wrote:
>>
>> Hi,
>>
>> Well I have set :
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setMaxParallelism(5);
>> env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.enableCheckpointing(500);
>>
>> On the other hand .setBounded(OffsetsInitializer.latest())
>>
>> Perhaps that is a bit of a conflict?
>> The job should be unbounded anyway.
>> When I cancel it  (the unbounded) via the GUI and start it again I do not 
>> see the same issue.
>>
>> So perhaps not very important.
>> Regards Hans
>>
>>
>> Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler <ches...@apache.org>:
>>
>>> Have you set anything beyond the defaults in the Flink configuration?
>>>
>>> This could just be noise with some Kafka stuff running in the background
>>> while Flink is shutting things down (and closing the classloader).
>>>
>>> On 04/02/2022 15:29, HG wrote:
>>>
>>> Hi,
>>>
>>> I am developing my flink application.
>>> For start I have built a class that reads events from Kafka and outputs
>>> them datastream.print()
>>>
>>> The job runs every time.
>>> But starting with the 2nd time I see this in the standalone session log:
>>>
>>> 2022-02-04 15:16:30,801 WARN  org.apache.kafka.common.utils.Utils
>>>                    [] - Failed to close KafkaClient with type
>>> org.apache.kafka.clients.NetworkClient
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/network/Selector$CloseMode
>>>         at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:806)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.common.network.Selector.close(Selector.java:365)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
>>> ~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at
>>> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
>>> [blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
>>>         at java.lang.Thread.run(Thread.java:829) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.kafka.common.network.Selector$CloseMode
>>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> ~[?:?]
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>>         ... 6 more
>>> 2022-02-04 15:16:30,802 INFO
>>>  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
>>> coordinator for source Source: Kafka Source -> Sink: Print to Std. Out
>>> closed.
>>>
>>> Am I doing something wrong?
>>>
>>> This is basically the gist of the code:
>>>
>>> KafkaSource<String> source = KafkaSource
>>>         .<String>builder()
>>>         .setBootstrapServers(brokers)
>>> .setGroupId(groupId)
>>> .setTopics(kafkaInputTopic)
>>>         .setValueOnlyDeserializer(new 
>>> SimpleStringSchema())//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class)).setStartingOffsets(OffsetsInitializer.earliest())
>>>         .setBounded(OffsetsInitializer.latest())
>>>         .build();
>>> //withIdleness.duration()//env.fromSource(source, 
>>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka 
>>> Source");DataStream<String> ds = env.fromSource(source, 
>>> WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
>>>
>>> ds.print();
>>>
>>>
>>>
>>
>

Reply via email to