Hi Becket/Till, Thanks for the detail explanation. Just to confirm: the issue in FLINK-8093 refers to multiple Kafka consumer within the same TM - thus the fix should be to make consumer client.id unique for different tasks ? and the issue here is an issue internal to the Kafka consumer, where both the polling consumer thread and the MBean JMX reporter thread share the same client.id - thus we should fix this in the Kafka level?
If this is the correct understanding, I think we should separate them since they are in fact 2 different issues. -- Rong On Tue, Mar 17, 2020 at 3:36 AM Becket Qin <becket....@gmail.com> wrote: > Actually it might be better to create another ticket, FLINK-8093 was > mainly complaining about the JMX bean collision when there are multiple > tasks running in the same TM. > > Jiangjie (Becket) Qin > > On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <becket....@gmail.com> wrote: > >> Hi Till, >> >> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> >> reports >> the same issue, although the reported information is not exactly correct, >> as this should not cause the producer to fail. I'll take care of the ticket. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> @Becket do we already have a JIRA ticket to track this effort? >>> >>> Cheers, >>> Till >>> >>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <becket....@gmail.com> wrote: >>> >>>> Hi Sidney, >>>> >>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which >>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which >>>> shares the client.id of the actual consumer fetching data. This is a >>>> bug that we should fix. >>>> >>>> As Rong said, this won't affect the normal operation of the consumer. >>>> It is just an AppInfo MBean for reporting some information. There might be >>>> some slight impact on the accuracy of the consumer metrics, but should be >>>> almost ignorable because the partition discoverer is quite inactive >>>> compared with the actual consumer. >>>> >>>> Thanks, >>>> >>>> Jiangjie (Becket) Qin >>>> >>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> We also had seen this issue before running Flink apps in a shared >>>>> cluster environment. >>>>> >>>>> Basically, Kafka is trying to register a JMX MBean[1] for application >>>>> monitoring. >>>>> This is only a WARN suggesting that you are registering more than one >>>>> MBean with the same client id "consumer-1", it should not affect your >>>>> normal application behavior. >>>>> >>>>> This is most likely occurring if you have more than one Kafka consumer >>>>> within the same JVM, are you using a session cluster[2]? can you share >>>>> more >>>>> on your application configuration including parallelism and slot configs? >>>>> Also based on the log, you are not configuring the "client.id" >>>>> correctly. which config key are you using? could you also share your fill >>>>> Kafka properties map? >>>>> >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session >>>>> >>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner < >>>>> sidney.fei...@startapp.com> wrote: >>>>> >>>>>> Hey, >>>>>> I've been using Flink for a while now without any problems when >>>>>> running apps with a FlinkKafkaConsumer. >>>>>> All my apps have the same overall logic (consume from kafka -> >>>>>> transform event -> write to file) and the only way they differ from each >>>>>> other is the topic they read (remaining kafka config remains identical) >>>>>> and >>>>>> the way they transform the event. >>>>>> But suddenly, I've been starting to get the following error: >>>>>> >>>>>> >>>>>> 2020-03-15 12:13:56,911 WARN >>>>>> org.apache.kafka.common.utils.AppInfoParser - Error >>>>>> registering AppInfo mbean >>>>>> javax.management.InstanceAlreadyExistsException: >>>>>> kafka.consumer:type=app-info,id=consumer-1 >>>>>> at >>>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >>>>>> at >>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >>>>>> >>>>>> at >>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >>>>>> >>>>>> at >>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >>>>>> >>>>>> at >>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >>>>>> >>>>>> at >>>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >>>>>> >>>>>> at >>>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805) >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659) >>>>>> >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) >>>>>> >>>>>> at >>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) >>>>>> >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >>>>>> at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> >>>>>> I've tried setting the "client.id" on my consumer to a random UUID, >>>>>> making sure I don't have any duplicates but that didn't help either. >>>>>> Any idea what could be causing this? >>>>>> >>>>>> Thanks 🙂 >>>>>> >>>>>> *Sidney Feiner* */* Data Platform Developer >>>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp >>>>>> >>>>>> [image: emailsignature] >>>>>> >>>>>>