Hey Jeremiah,

It looks like the TopicExistsException should be handled by the system
admin and not rethrown:
https://github.com/apache/samza/blob/06702af8fda1d016ae55461c404b55b84b20ffd2/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala#L442

I have a theory what's happening here. I think the TopicExistsException was
moved from the org.apache.kafka.common package in kafka 0.8.2
http://apache.osuosl.org/kafka/0.8.2.0/scaladoc/kafka/common/TopicExistsException.html

to the org.apache.kafka.common.errors package in kafka 0.10
https://kafka.apache.org/0102/javadoc/org/apache/kafka/common/errors/TopicExistsException.html

And Samza 0.13 expects the latter.

Can you double check that your job is actually using kafka 0.10.1.1,
perhaps by inspecting the jars?

-Jake

On Mon, Aug 14, 2017 at 11:55 AM, Jeremiah Adams <jad...@helixeducation.com>
wrote:

> I am having an issue with topic creation after updating dependencies. I
> bumped samza dependencies from scala 2.10 v 0.10.1 to  scala 2.11 0.13.0
> and org.apache.kafka dependency from kafka_2.10 0.8.1 to kafka_2.11
> 0.10.1.1.
> I am seeing an error that the topic already exists and the job gets stuck
> in a loop with logs like below. The job will not move into 'accepted' state
> in yarn and never consumes the topics it should be consuming. The zk, yarn
> and kafka nodes are newly deployed. I'm at a loss, any ideas?
>
>
> [10.201.9.105] out: 17:18:49.347 [main] DEBUG
> org.apache.samza.system.kafka.KafkaSystemAdmin - Exception detail:
> [10.201.9.105] out: kafka.common.TopicExistsException: Topic
> "__samza_coordinator_inquiry-submission_1" already exists.
> [10.201.9.105] out: at kafka.admin.AdminUtils$.
> createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:420)
> [10.201.9.105] out: at kafka.admin.AdminUtils$.
> createTopic(AdminUtils.scala:404)
> [10.201.9.105] out: at org.apache.samza.system.kafka.
> KafkaSystemAdmin$$anonfun$createStream$1.apply(KafkaSystemAdmin.scala:425)
> [10.201.9.105] out: at org.apache.samza.system.kafka.
> KafkaSystemAdmin$$anonfun$createStream$1.apply(KafkaSystemAdmin.scala:422)
> [10.201.9.105] out: at org.apache.samza.util.ExponentialSleepStrategy.run(
> ExponentialSleepStrategy.scala:82)
> [10.201.9.105] out: at org.apache.samza.system.kafka.
> KafkaSystemAdmin.createStream(KafkaSystemAdmin.scala:421)
> [10.201.9.105] out: at org.apache.samza.system.kafka.KafkaSystemAdmin.
> createCoordinatorStream(KafkaSystemAdmin.scala:336)
> [10.201.9.105] out: at org.apache.samza.job.JobRunner.run(JobRunner.scala:
> 88)
> [10.201.9.105] out: at org.apache.samza.job.JobRunner$.doOperation(
> JobRunner.scala:52)
> [10.201.9.105] out: at org.apache.samza.job.JobRunner$.main(JobRunner.
> scala:47)
> [10.201.9.105] out: at org.apache.samza.job.JobRunner.main(JobRunner.
> scala)
> [10.201.9.105] out: 17:18:49.347 [main-SendThread(ip-10-201-9-
> 243.us-west-2.compute.internal:2181)] DEBUG org.apache.zookeeper.ClientCnxn
> - An exception was thrown while closing send thread for session
> 0x25de16b1f500013 : Unable to read additional data from server sessionid
> 0x25de16b1f500013, likely server has closed socket
> [10.201.9.105] out: 17:18:49.349 [main-EventThread] INFO
> org.apache.zookeeper.ClientCnxn - EventThread shut down?
>
>
>
> Jeremiah Adams
> Software Engineer
> www.helixeducation.com<http://www.helixeducation.com/>
> Blog<http://www.helixeducation.com/blog/> | Twitter<https://twitter.com/
> HelixEducation> | Facebook<https://www.facebook.com/HelixEducation> |
> LinkedIn<http://www.linkedin.com/company/3609946>
>

Reply via email to