Hi Becket/Till,

Thanks for the detail explanation. Just to confirm:
the issue in FLINK-8093 refers to multiple Kafka consumer within the same
TM - thus the fix should be to make consumer client.id unique for different
tasks ?
and the issue here is an issue internal to the Kafka consumer, where both
the polling consumer thread and the MBean JMX reporter thread share the
same client.id - thus we should fix this in the Kafka level?

If this is the correct understanding, I think we should separate them since
they are in fact 2 different issues.

--
Rong

On Tue, Mar 17, 2020 at 3:36 AM Becket Qin <becket....@gmail.com> wrote:

> Actually it might be better to create another ticket, FLINK-8093 was
> mainly complaining about the JMX bean collision when there are multiple
> tasks running in the same TM.
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 17, 2020 at 6:33 PM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Till,
>>
>> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> 
>> reports
>> the same issue, although the reported information is not exactly correct,
>> as this should not cause the producer to fail. I'll take care of the ticket.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> @Becket do we already have a JIRA ticket to track this effort?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin <becket....@gmail.com> wrote:
>>>
>>>> Hi Sidney,
>>>>
>>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>>>> shares the client.id of the actual consumer fetching data. This is a
>>>> bug that we should fix.
>>>>
>>>> As Rong said, this won't affect the normal operation of the consumer.
>>>> It is just an AppInfo MBean for reporting some information. There might be
>>>> some slight impact on the accuracy of the consumer metrics, but should be
>>>> almost ignorable because the partition discoverer is quite inactive
>>>> compared with the actual consumer.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> We also had seen this issue before running Flink apps in a shared
>>>>> cluster environment.
>>>>>
>>>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>>>> monitoring.
>>>>> This is only a WARN suggesting that you are registering more than one
>>>>> MBean with the same client id "consumer-1", it should not affect your
>>>>> normal application behavior.
>>>>>
>>>>> This is most likely occurring if you have more than one Kafka consumer
>>>>> within the same JVM, are you using a session cluster[2]? can you share 
>>>>> more
>>>>> on your application configuration including parallelism and slot configs?
>>>>> Also based on the log, you are not configuring the "client.id"
>>>>> correctly. which config key are you using? could you also share your fill
>>>>> Kafka properties map?
>>>>>
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>>>
>>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>>>> sidney.fei...@startapp.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>> I've been using Flink for a while now without any problems when
>>>>>> running apps with a FlinkKafkaConsumer.
>>>>>> All my apps have the same overall logic (consume from kafka ->
>>>>>> transform event -> write to file) and the only way they differ from each
>>>>>> other is the topic they read (remaining kafka config remains identical) 
>>>>>> and
>>>>>> the way they transform the event.
>>>>>> But suddenly, I've been starting to get the following error:
>>>>>>
>>>>>>
>>>>>> 2020-03-15 12:13:56,911 WARN
>>>>>>  org.apache.kafka.common.utils.AppInfoParser                   - Error
>>>>>> registering AppInfo mbean
>>>>>> javax.management.InstanceAlreadyExistsException:
>>>>>> kafka.consumer:type=app-info,id=consumer-1
>>>>>>        at
>>>>>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>>>
>>>>>>        at
>>>>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:805)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
>>>>>>
>>>>>>        at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>>>>>>
>>>>>>        at
>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>>>        at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>> I've tried setting the "client.id" on my consumer to a random UUID,
>>>>>> making sure I don't have any duplicates but that didn't help either.
>>>>>> Any idea what could be causing this?
>>>>>>
>>>>>> Thanks 🙂
>>>>>>
>>>>>> *Sidney Feiner* */* Data Platform Developer
>>>>>> M: +972.528197720 */* Skype: sidney.feiner.startapp
>>>>>>
>>>>>> [image: emailsignature]
>>>>>>
>>>>>>

Reply via email to