@Becket do we already have a JIRA ticket to track this effort? Cheers, Till
On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <becket....@gmail.com> wrote: > Hi Sidney, > > The WARN logging you saw was from the AbstractPartitionDiscoverer which is > created by FlinkKafkaConsumer itself. It has an internal consumer which > shares the client.id of the actual consumer fetching data. This is a bug > that we should fix. > > As Rong said, this won't affect the normal operation of the consumer. It > is just an AppInfo MBean for reporting some information. There might be > some slight impact on the accuracy of the consumer metrics, but should be > almost ignorable because the partition discoverer is quite inactive > compared with the actual consumer. > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <walter...@gmail.com> wrote: > >> We also had seen this issue before running Flink apps in a shared cluster >> environment. >> >> Basically, Kafka is trying to register a JMX MBean[1] for application >> monitoring. >> This is only a WARN suggesting that you are registering more than one >> MBean with the same client id "consumer-1", it should not affect your >> normal application behavior. >> >> This is most likely occurring if you have more than one Kafka consumer >> within the same JVM, are you using a session cluster[2]? can you share more >> on your application configuration including parallelism and slot configs? >> Also based on the log, you are not configuring the "client.id" >> correctly. which config key are you using? could you also share your fill >> Kafka properties map? >> >> >> -- >> Rong >> >> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session >> >> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <sidney.fei...@startapp.com> >> wrote: >> >>> Hey, >>> I've been using Flink for a while now without any problems when running >>> apps with a FlinkKafkaConsumer. >>> All my apps have the same overall logic (consume from kafka -> transform >>> event -> write to file) and the only way they differ from each other is the >>> topic they read (remaining kafka config remains identical) and the way they >>> transform the event. >>> But suddenly, I've been starting to get the following error: >>> >>> >>> 2020-03-15 12:13:56,911 WARN >>> org.apache.kafka.common.utils.AppInfoParser - Error >>> registering AppInfo mbean >>> javax.management.InstanceAlreadyExistsException: >>> kafka.consumer:type=app-info,id=consumer-1 >>> at >>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >>> >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >>> >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >>> >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >>> >>> at >>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >>> >>> at >>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) >>> >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805) >>> >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) >>> >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >>> >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) >>> >>> at >>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) >>> >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>> I've tried setting the "client.id" on my consumer to a random UUID, >>> making sure I don't have any duplicates but that didn't help either. >>> Any idea what could be causing this? >>> >>> Thanks 🙂 >>> >>> *Sidney Feiner* */* Data Platform Developer >>> M: +972.528197720 */* Skype: sidney.feiner.startapp >>> >>> [image: emailsignature] >>> >>>