Any help is appreciated about the exception below.
Also my Kafkasource code is below. The parallelism is 16 for this task.

                KafkaSource<String> sourceStationsPeriodic = KafkaSource.<
String>builder()
                                .setBootstrapServers(parameter.get(
KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
                                .setTopics(parameter.get(
KAFKA_TOPIC_READ_WIFI))
                                .setGroupId(parameter.get(KAFKA_GROUP))
                                .setStartingOffsets(OffsetsInitializer.
latest())
                                .setValueOnlyDeserializer(new
SimpleStringSchema())

                                .build();
                // Our Kafka Source
                KafkaSource<String> sourceStationsWifiInterface =
KafkaSource.<String>builder()
                                .setBootstrapServers(parameter.get(
KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
                                .setTopics(parameter.get(
KAFKA_TOPIC_READ_WIFI_INTERFACE))
                                .setGroupId(parameter.get(KAFKA_GROUP))
                                .setStartingOffsets(OffsetsInitializer.
latest())
                                .setValueOnlyDeserializer(new
SimpleStringSchema())
                                .build();
                KafkaSource<String> sourceTwinMessage = KafkaSource.<String>
builder()
                                .setBootstrapServers(parameter.get(
KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
                                .setTopics(parameter.get(
KAFKA_TOPIC_READ_TWIN_MESSAGE))
                                .setGroupId(parameter.get(KAFKA_GROUP))
                                .setStartingOffsets(OffsetsInitializer.
latest())
                                .setValueOnlyDeserializer(new
SimpleStringSchema())
                                .build();

                KafkaSource<String> sourceStationsOnDemand = KafkaSource.<
String>builder()
                                .setBootstrapServers(parameter.get(
KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
                                .setTopics(parameter.get(
KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
                                .setGroupId(parameter.get(KAFKA_GROUP))
                                .setStartingOffsets(OffsetsInitializer.
latest())
                                .setValueOnlyDeserializer(new
SimpleStringSchema())
                                .build();

                KafkaSource<String> sourceDeviceInfo = KafkaSource.<String>
builder()
                                .setBootstrapServers(parameter.get(
KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
                                .setTopics(parameter.get(
KAFKA_TOPIC_READ_DEVICE_INFO))
                                .setGroupId(parameter.get(KAFKA_GROUP))
                                .setStartingOffsets(OffsetsInitializer.
latest())
                                .setValueOnlyDeserializer(new
SimpleStringSchema())
                                .build();



2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
               [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
~[?:?]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
~[?:?]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
~[?:?]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
~[?:?]
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
~[?:?]
        at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:?]
        at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:597)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:539)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:133)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
        at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225)
~[flink-dist-1.16.0.jar:1.16.0]
        at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449)
~[flink-dist-1.16.0.jar:1.16.0]
        at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
[flink-dist-1.16.0.jar:1.16.0]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[?:?]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
        at java.lang.Thread.run(Thread.java:829) [?:?]

Reply via email to