Hi, Liquan

I run the two workers inside docker containers and a connector having 6
tasks. They read from a topic having 6 partitions Then I kill one of the
two containers using docker kill or docker restart command. When the
container is up again a rebalance happens and sometimes few tasks don't
consume messages anymore even thought the onPartitionAssigned functions
says that they are handling a partition of the topic. Let me know if you
need other information
I use Kafka 0.9.0.

Thanks for the help,
Matteo

2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>:

> Hi Matteo,
>
> I am not completely follow the steps.  Can you share the exact command to
> reproduce the issue? What kind of commands did you use to restart the
> connector? Which version of Kafka are you using?
>
> Thanks,
> Liquan
>
> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com>
> wrote:
>
> > Hi again, I was able to reproduce the bug in the same scenario (two
> workers
> > on separate machines) just by deleting the connector from the Rest API
> and
> > then restarting it again.
> > I also got this error on one of the workers :
> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45
> +0000]
> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
> > (org.apache.kafka.connect.runtime.rest.RestServer:60)
> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
> > WorkerSinkTask-kafka-sink-connector-1
> > (org.apache.kafka.connect.util.ShutdownableThread:141)
> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task
> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
> > (org.apache.kafka.connect.runtime.Worker:312)
> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work thread,
> > exiting:
> >  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
> > java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> > multi-threaded access
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
> > at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
> > at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
> > at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
> > at
> >
> >
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > On the successive restart, 2 out of 6 tasks were not receiving messages
> > anymore
> >
> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
> >
> > > Hi Liquan,
> > > thanks for the fast response.
> > > I'm able to reproduce the error by having two workers running on two
> > > different machines. If I restart one of the two worker, the failover
> > logic
> > > correctly detects the failure and shut down the tasks on the healthy
> > worker
> > > for rebalancing. When the failed worker is up again the tasks are
> > > distributed correctly among the two workers but some tasks don't get
> new
> > > messages anymore. How can I check that actually all the input topic
> > > partitions are correctly reassigned?
> > >
> > > Matteo
> > >
> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
> > >
> > >> Hi Matteo,
> > >>
> > >> Glad to hear that you are building a connector. To better understand
> the
> > >> issue, can you provide the exact steps to re-produce the issue? One
> > thing
> > >> I
> > >> am confused is that when one worker is shutdown, you don't need to
> > restart
> > >> the connector through the rest API, the failover logic should handle
> the
> > >> connector and tasks shutdown and start up.
> > >>
> > >> The offset storage topic is used for storing offset for source
> > connectors.
> > >> For sink connector, the offset is simply Kafka offset and will be
> stored
> > >> in
> > >> the __consumer_offset topic.
> > >>
> > >> Thanks,
> > >> Liquan
> > >>
> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <matteo.lu...@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi,
> > >> > I'm working on a custom implementation of a sink connector for Kafka
> > >> > Connect framework. I'm testing the connector for fault tolerance by
> > >> killing
> > >> > the worker process  and restarting the connector through the Rest
> API
> > >> and
> > >> > occasionally I notice that some tasks don't receive anymore messages
> > >> from
> > >> > the internal consumers. I don't get any errors from the log and the
> > >> tasks
> > >> > seem to be initialised correctly but some of them just don't process
> > >> > messages anymore. Normally when I restart again the connector, the
> > tasks
> > >> > read all the messages skipped before. I'm executing Kafka Connect in
> > >> > distributed mode.
> > >> >
> > >> > Could it be a problem of the cleanup function invoked when closing
> the
> > >> > connector causing a leak in consumer connections with the broker?
> Any
> > >> > ideas?
> > >> >
> > >> > And also, from the documentation I read that the connector save the
> > >> offset
> > >> > of the tasks in a special topic in Kafka (the one specified via
> > >> > offset.storage.topic) but it is empty even though the connector
> > process
> > >> > messages. Is it normal?
> > >> >
> > >> > Thanks,
> > >> > Matteo
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Liquan Pei
> > >> Software Engineer, Confluent Inc
> > >>
> > >
> > >
> > >
> > > --
> > > Matteo Remo Luzzi
> > >
> >
> >
> >
> > --
> > Matteo Remo Luzzi
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>



-- 
Matteo Remo Luzzi

Reply via email to