Liquan, We're constantly hitting this problem in our prod cluster. Do you have a JIRA issue that relates to this, and when will this bugfix be backported to the 0.9.x branch? We're not planning on upgrading to 0.10 for a while, since the assumption was that the 0.9.x line would be more stable.
Thanks, Marcos Juarez On Mon, May 16, 2016 at 12:28 PM, Liquan Pei <liquan...@gmail.com> wrote: > Hi Matteo, > > There was a bug in the 0.9.1 such that task.close() can be invoked both in > the Worker thread and Herder thread. There can be a race condition that > consumer.close() is invoked in multiple threads at the same time. As the > consumer is designed to be used in single thread, thus the concurrent > modification exception is thrown if consumer.close() method is invoked by > multiple threads. > > This bug is fixed in the coming 0.10.0 release and consumer.close() is only > invoked in the SinkTaskThread. > > Thanks, > Liquan > > On Sun, May 15, 2016 at 11:57 PM, Matteo Luzzi <matteo.lu...@gmail.com> > wrote: > > > Any other thoughts on this? > > Thanks, > > Matteo > > > > 2016-05-12 13:09 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > > > > > I found also this suspicious log snippet that might be revelant. The > task > > > executed by thread 134 is the one that won't receive messages > > > > > > INFO Attempt to heart beat failed since the group is rebalancing, try > to > > > re-join group. > > > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:633) > > > [2016-05-12 10:27:09,623] INFO > > > [kafka-connect-topic-0, kafka-connect-topic-1, kafka-connect-topic-2, > > kafka-connect-topic-3, kafka-connect-topic-4, kafka-connect-topic-5] > > > topic-partitions are revoked > > > from this thread task 134 > > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:99) > > > [2016-05-12 10:27:09,623] INFO > > > org.apache.kafka.connect.runtime.WorkerSinkTask@5c29784c Committing > > > offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187) > > > [2016-05-12 10:27:09,634] INFO [kafka-connect-topic-0] topic-partitions > > > are assigned from this thread task 134 > > > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:92) > > > > > > 2016-05-12 8:58 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > > > > > >> Hi, Liquan > > >> > > >> I run the two workers inside docker containers and a connector having > 6 > > >> tasks. They read from a topic having 6 partitions Then I kill one of > the > > >> two containers using docker kill or docker restart command. When the > > >> container is up again a rebalance happens and sometimes few tasks > don't > > >> consume messages anymore even thought the onPartitionAssigned > functions > > >> says that they are handling a partition of the topic. Let me know if > you > > >> need other information > > >> I use Kafka 0.9.0. > > >> > > >> Thanks for the help, > > >> Matteo > > >> > > >> 2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>: > > >> > > >>> Hi Matteo, > > >>> > > >>> I am not completely follow the steps. Can you share the exact > command > > to > > >>> reproduce the issue? What kind of commands did you use to restart the > > >>> connector? Which version of Kafka are you using? > > >>> > > >>> Thanks, > > >>> Liquan > > >>> > > >>> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi < > matteo.lu...@gmail.com> > > >>> wrote: > > >>> > > >>> > Hi again, I was able to reproduce the bug in the same scenario (two > > >>> workers > > >>> > on separate machines) just by deleting the connector from the Rest > > API > > >>> and > > >>> > then restarting it again. > > >>> > I also got this error on one of the workers : > > >>> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - > [11/May/2016:11:29:45 > > >>> +0000] > > >>> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 - 1171 > > >>> > (org.apache.kafka.connect.runtime.rest.RestServer:60) > > >>> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread > > >>> > WorkerSinkTask-kafka-sink-connector-1 > > >>> > (org.apache.kafka.connect.util.ShutdownableThread:141) > > >>> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task > > >>> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed. > > >>> > (org.apache.kafka.connect.runtime.Worker:312) > > >>> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work > > >>> thread, > > >>> > exiting: > > >>> > > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166) > > >>> > java.util.ConcurrentModificationException: KafkaConsumer is not > safe > > >>> for > > >>> > multi-threaded access > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128) > > >>> > at > org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182) > > >>> > at > > >>> > > > >>> > > > >>> > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > > >>> > at java.lang.Thread.run(Thread.java:745) > > >>> > > > >>> > On the successive restart, 2 out of 6 tasks were not receiving > > messages > > >>> > anymore > > >>> > > > >>> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > > >>> > > > >>> > > Hi Liquan, > > >>> > > thanks for the fast response. > > >>> > > I'm able to reproduce the error by having two workers running on > > two > > >>> > > different machines. If I restart one of the two worker, the > > failover > > >>> > logic > > >>> > > correctly detects the failure and shut down the tasks on the > > healthy > > >>> > worker > > >>> > > for rebalancing. When the failed worker is up again the tasks are > > >>> > > distributed correctly among the two workers but some tasks don't > > get > > >>> new > > >>> > > messages anymore. How can I check that actually all the input > topic > > >>> > > partitions are correctly reassigned? > > >>> > > > > >>> > > Matteo > > >>> > > > > >>> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>: > > >>> > > > > >>> > >> Hi Matteo, > > >>> > >> > > >>> > >> Glad to hear that you are building a connector. To better > > >>> understand the > > >>> > >> issue, can you provide the exact steps to re-produce the issue? > > One > > >>> > thing > > >>> > >> I > > >>> > >> am confused is that when one worker is shutdown, you don't need > to > > >>> > restart > > >>> > >> the connector through the rest API, the failover logic should > > >>> handle the > > >>> > >> connector and tasks shutdown and start up. > > >>> > >> > > >>> > >> The offset storage topic is used for storing offset for source > > >>> > connectors. > > >>> > >> For sink connector, the offset is simply Kafka offset and will > be > > >>> stored > > >>> > >> in > > >>> > >> the __consumer_offset topic. > > >>> > >> > > >>> > >> Thanks, > > >>> > >> Liquan > > >>> > >> > > >>> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi < > > >>> matteo.lu...@gmail.com> > > >>> > >> wrote: > > >>> > >> > > >>> > >> > Hi, > > >>> > >> > I'm working on a custom implementation of a sink connector for > > >>> Kafka > > >>> > >> > Connect framework. I'm testing the connector for fault > tolerance > > >>> by > > >>> > >> killing > > >>> > >> > the worker process and restarting the connector through the > > Rest > > >>> API > > >>> > >> and > > >>> > >> > occasionally I notice that some tasks don't receive anymore > > >>> messages > > >>> > >> from > > >>> > >> > the internal consumers. I don't get any errors from the log > and > > >>> the > > >>> > >> tasks > > >>> > >> > seem to be initialised correctly but some of them just don't > > >>> process > > >>> > >> > messages anymore. Normally when I restart again the connector, > > the > > >>> > tasks > > >>> > >> > read all the messages skipped before. I'm executing Kafka > > Connect > > >>> in > > >>> > >> > distributed mode. > > >>> > >> > > > >>> > >> > Could it be a problem of the cleanup function invoked when > > >>> closing the > > >>> > >> > connector causing a leak in consumer connections with the > > broker? > > >>> Any > > >>> > >> > ideas? > > >>> > >> > > > >>> > >> > And also, from the documentation I read that the connector > save > > >>> the > > >>> > >> offset > > >>> > >> > of the tasks in a special topic in Kafka (the one specified > via > > >>> > >> > offset.storage.topic) but it is empty even though the > connector > > >>> > process > > >>> > >> > messages. Is it normal? > > >>> > >> > > > >>> > >> > Thanks, > > >>> > >> > Matteo > > >>> > >> > > > >>> > >> > > >>> > >> > > >>> > >> > > >>> > >> -- > > >>> > >> Liquan Pei > > >>> > >> Software Engineer, Confluent Inc > > >>> > >> > > >>> > > > > >>> > > > > >>> > > > > >>> > > -- > > >>> > > Matteo Remo Luzzi > > >>> > > > > >>> > > > >>> > > > >>> > > > >>> > -- > > >>> > Matteo Remo Luzzi > > >>> > > > >>> > > >>> > > >>> > > >>> -- > > >>> Liquan Pei > > >>> Software Engineer, Confluent Inc > > >>> > > >> > > >> > > >> > > >> -- > > >> Matteo Remo Luzzi > > >> > > > > > > > > > > > > -- > > > Matteo Remo Luzzi > > > > > > > > > > > -- > > Matteo Remo Luzzi > > > > > > -- > Liquan Pei > Software Engineer, Confluent Inc >