Hi, Liquan I run the two workers inside docker containers and a connector having 6 tasks. They read from a topic having 6 partitions Then I kill one of the two containers using docker kill or docker restart command. When the container is up again a rebalance happens and sometimes few tasks don't consume messages anymore even thought the onPartitionAssigned functions says that they are handling a partition of the topic. Let me know if you need other information I use Kafka 0.9.0.
Thanks for the help, Matteo 2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>: > Hi Matteo, > > I am not completely follow the steps. Can you share the exact command to > reproduce the issue? What kind of commands did you use to restart the > connector? Which version of Kafka are you using? > > Thanks, > Liquan > > On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com> > wrote: > > > Hi again, I was able to reproduce the bug in the same scenario (two > workers > > on separate machines) just by deleting the connector from the Rest API > and > > then restarting it again. > > I also got this error on one of the workers : > > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45 > +0000] > > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 - 1171 > > (org.apache.kafka.connect.runtime.rest.RestServer:60) > > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread > > WorkerSinkTask-kafka-sink-connector-1 > > (org.apache.kafka.connect.util.ShutdownableThread:141) > > [2016-05-11 11:29:52,050] ERROR Graceful stop of task > > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed. > > (org.apache.kafka.connect.runtime.Worker:312) > > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work thread, > > exiting: > > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166) > > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > > multi-threaded access > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282) > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213) > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128) > > at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898) > > at > > > > > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238) > > at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209) > > at > > > > > org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182) > > at > > > > > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) > > at java.lang.Thread.run(Thread.java:745) > > > > On the successive restart, 2 out of 6 tasks were not receiving messages > > anymore > > > > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > > > > > Hi Liquan, > > > thanks for the fast response. > > > I'm able to reproduce the error by having two workers running on two > > > different machines. If I restart one of the two worker, the failover > > logic > > > correctly detects the failure and shut down the tasks on the healthy > > worker > > > for rebalancing. When the failed worker is up again the tasks are > > > distributed correctly among the two workers but some tasks don't get > new > > > messages anymore. How can I check that actually all the input topic > > > partitions are correctly reassigned? > > > > > > Matteo > > > > > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>: > > > > > >> Hi Matteo, > > >> > > >> Glad to hear that you are building a connector. To better understand > the > > >> issue, can you provide the exact steps to re-produce the issue? One > > thing > > >> I > > >> am confused is that when one worker is shutdown, you don't need to > > restart > > >> the connector through the rest API, the failover logic should handle > the > > >> connector and tasks shutdown and start up. > > >> > > >> The offset storage topic is used for storing offset for source > > connectors. > > >> For sink connector, the offset is simply Kafka offset and will be > stored > > >> in > > >> the __consumer_offset topic. > > >> > > >> Thanks, > > >> Liquan > > >> > > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <matteo.lu...@gmail.com > > > > >> wrote: > > >> > > >> > Hi, > > >> > I'm working on a custom implementation of a sink connector for Kafka > > >> > Connect framework. I'm testing the connector for fault tolerance by > > >> killing > > >> > the worker process and restarting the connector through the Rest > API > > >> and > > >> > occasionally I notice that some tasks don't receive anymore messages > > >> from > > >> > the internal consumers. I don't get any errors from the log and the > > >> tasks > > >> > seem to be initialised correctly but some of them just don't process > > >> > messages anymore. Normally when I restart again the connector, the > > tasks > > >> > read all the messages skipped before. I'm executing Kafka Connect in > > >> > distributed mode. > > >> > > > >> > Could it be a problem of the cleanup function invoked when closing > the > > >> > connector causing a leak in consumer connections with the broker? > Any > > >> > ideas? > > >> > > > >> > And also, from the documentation I read that the connector save the > > >> offset > > >> > of the tasks in a special topic in Kafka (the one specified via > > >> > offset.storage.topic) but it is empty even though the connector > > process > > >> > messages. Is it normal? > > >> > > > >> > Thanks, > > >> > Matteo > > >> > > > >> > > >> > > >> > > >> -- > > >> Liquan Pei > > >> Software Engineer, Confluent Inc > > >> > > > > > > > > > > > > -- > > > Matteo Remo Luzzi > > > > > > > > > > > -- > > Matteo Remo Luzzi > > > > > > -- > Liquan Pei > Software Engineer, Confluent Inc > -- Matteo Remo Luzzi