I think you can safely ignore this warning. It shouldn't cause any harm,
but I will file a ticket nonetheless.
On 07/02/2022 12:52, HG wrote:
I have nothing like that in the config (flink-conf.yaml).
Just downloaded the software and did bin/start-cluster.sh
Op ma 7 feb. 2022 om 10:52 schreef Chesnay Schepler <ches...@apache.org>:
I meant in the Flink config of the cluster you are submitting the
jobs to.
Specifically whether classloader.check-leaked-classloader was set
to false.
On 07/02/2022 10:28, HG wrote:
Hi,
Well I have set :
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(5);
env.setRuntimeMode(RuntimeExecutionMode.*STREAMING*);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(500);
On the other hand .setBounded(OffsetsInitializer.latest())
Perhaps that is a bit of a conflict? The job should be unbounded
anyway. When I cancel it (the unbounded) via the GUI and start it
again I do not see the same issue.
So perhaps not very important. Regards Hans
Op ma 7 feb. 2022 om 09:23 schreef Chesnay Schepler
<ches...@apache.org>:
Have you set anything beyond the defaults in the Flink
configuration?
This could just be noise with some Kafka stuff running in the
background while Flink is shutting things down (and closing
the classloader).
On 04/02/2022 15:29, HG wrote:
Hi,
I am developing my flink application.
For start I have built a class that reads events from Kafka
and outputs them datastream.print()
The job runs every time.
But starting with the 2nd time I see this in the standalone
session log:
2022-02-04 15:16:30,801 WARN
org.apache.kafka.common.utils.Utils [] -
Failed to close KafkaClient with type
org.apache.kafka.clients.NetworkClient
java.lang.NoClassDefFoundError:
org/apache/kafka/common/network/Selector$CloseMode
at
org.apache.kafka.common.network.Selector.close(Selector.java:806)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.common.network.Selector.close(Selector.java:365)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:639)
~[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:834)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1219)
[blob_p-d9d83f46e9376826081af75cb6693b745018f7ca-988b5e01be784cebb00104c3aa45094b:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.network.Selector$CloseMode
at
java.net.URLClassLoader.findClass(URLClassLoader.java:476)
~[?:?]
at
java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
... 6 more
2022-02-04 15:16:30,802 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator
[] - Source coordinator for source Source: Kafka Source ->
Sink: Print to Std. Out closed.
Am I doing something wrong?
This is basically the gist of the code:
KafkaSource<String> source = KafkaSource
.<String>builder()
.setBootstrapServers(brokers)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setValueOnlyDeserializer(new SimpleStringSchema())
//.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeSerializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
//withIdleness.duration() //env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source"); DataStream<String>
ds = env.fromSource(source, WatermarkStrategy.forMonotonousTimestamps(),"Kafka Source");
ds.print();