Nicolas Colomer created SAMZA-1027:
--------------------------------------

             Summary: InstanceAlreadyExistsException while starting up
                 Key: SAMZA-1027
                 URL: https://issues.apache.org/jira/browse/SAMZA-1027
             Project: Samza
          Issue Type: Bug
          Components: kafka
    Affects Versions: 0.11
            Reporter: Nicolas Colomer


After upgrading Samza, I started to see following WARN log while starting a 
Samza job:

{code}
2016-09-29 10:30:09 AppInfoParser [WARN] task[] ssp[] offset[] Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=app-info,id=samza_producer-my_awesome_job-123
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
        at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
        at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
        at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:328)
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163)
        at 
org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
        at 
org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:124)
        at 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:113)
        at 
org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.send(AbstractCoordinatorStreamManager.java:72)
        at 
org.apache.samza.container.LocalityManager.writeContainerToHostMapping(LocalityManager.java:134)
        at 
org.apache.samza.container.SamzaContainer.startLocalityManager(SamzaContainer.scala:739)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:651)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:116)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:90)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
{code}

More precisely, in my situation, this log occurs twice during startup of the 
job.

I figured out that it follows the creation of 2 producers: {{samza_producer}} 
and {{samza_checkpoint_manager}} with client id respectively equals to 
{{samza_producer-my_awesome_job-123}} and 
{{samza_checkpoint_manager-my_awesome_job-123}}.

This issue seems to be directly related to SAMZA-981, that remove discriminants 
timestamp plus unique counter value from the {{client.id}} string.

According to KAFKA-3992, this error occurs when multiple producers / consumers 
are created with the same {{client.id}} setting.

Looking at the source code, there is a 
[lock|https://github.com/ncolomer/samza/blob/17e65d1cbdda1ad436f47c15fa4c86332e229a93/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala#L119-L127]
 in {{KafkaSystemProducer}} that should prevent any race condition where this 
could happen. But is there any case where this class (or the {{getProducer}} 
lambda function) may be instantiated/reused multiple time in the same JVM host?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to