[ 
https://issues.apache.org/jira/browse/SAMZA-1027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15534154#comment-15534154
 ] 

Xinyu Liu commented on SAMZA-1027:
----------------------------------

[~ncolomer]: could you please check whether you have producers for multiple 
kafka systems? In SAMZA-981, we changed the way to generate clientId for kafka 
consumers/producers to be based on just job name and job instance in order to 
support kafka quotas 
(http://kafka.apache.org/documentation.html#design_quotas). But it revealed a 
bug in kafka that client id is also used to register the metrics mbeans. So if 
you have kafka producers for multiple systems, we will create multiple 
producers sharing the same clientId. This caused the mbeans naming collision. 
According to kafka, the only side effect is that some of the kafka metrics 
won't be reported through mbeans. Does that affect you? We are working with 
Kafka to get a fix for it in upcoming kafka version.

> InstanceAlreadyExistsException while starting up
> ------------------------------------------------
>
>                 Key: SAMZA-1027
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1027
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.11
>            Reporter: Nicolas Colomer
>
> After upgrading Samza, I started to see following WARN log while starting a 
> Samza job:
> {code}
> 2016-09-29 10:30:09 AppInfoParser [WARN] task[] ssp[] offset[] Error 
> registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=app-info,id=samza_producer-my_awesome_job-123
>         at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>         at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>         at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:328)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163)
>         at 
> org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
>         at 
> org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
>         at 
> org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:124)
>         at 
> org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:113)
>         at 
> org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.send(AbstractCoordinatorStreamManager.java:72)
>         at 
> org.apache.samza.container.LocalityManager.writeContainerToHostMapping(LocalityManager.java:134)
>         at 
> org.apache.samza.container.SamzaContainer.startLocalityManager(SamzaContainer.scala:739)
>         at 
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:651)
>         at 
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:116)
>         at 
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:90)
>         at 
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> {code}
> More precisely, in my situation, this log occurs twice during startup of the 
> job.
> I figured out that it follows the creation of 2 producers: {{samza_producer}} 
> and {{samza_checkpoint_manager}} with client id respectively equals to 
> {{samza_producer-my_awesome_job-123}} and 
> {{samza_checkpoint_manager-my_awesome_job-123}}.
> This issue seems to be directly related to SAMZA-981, that remove 
> discriminants timestamp plus unique counter value from the {{client.id}} 
> string.
> According to KAFKA-3992, this error occurs when multiple producers / 
> consumers are created with the same {{client.id}} setting.
> Looking at the source code, there is a 
> [lock|https://github.com/ncolomer/samza/blob/17e65d1cbdda1ad436f47c15fa4c86332e229a93/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L119-L127]
>  in {{KafkaSystemProducer}} that should prevent any race condition where this 
> could happen. But is there any case where this class (or the {{getProducer}} 
> lambda function) may be instantiated/reused multiple time in the same JVM 
> host?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to