Hi Matteo,

I am not completely follow the steps.  Can you share the exact command to
reproduce the issue? What kind of commands did you use to restart the
connector? Which version of Kafka are you using?

Thanks,
Liquan

On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com>
wrote:

> Hi again, I was able to reproduce the bug in the same scenario (two workers
> on separate machines) just by deleting the connector from the Rest API and
> then restarting it again.
> I also got this error on one of the workers :
> [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45 +0000]
> "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
> (org.apache.kafka.connect.runtime.rest.RestServer:60)
> [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
> WorkerSinkTask-kafka-sink-connector-1
> (org.apache.kafka.connect.util.ShutdownableThread:141)
> [2016-05-11 11:29:52,050] ERROR Graceful stop of task
> org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
> (org.apache.kafka.connect.runtime.Worker:312)
> [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work thread,
> exiting:
>  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
> at
>
> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
> at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
> at
>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
> at
>
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
> at
>
> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
> at
>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
> at
>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> at java.lang.Thread.run(Thread.java:745)
>
> On the successive restart, 2 out of 6 tasks were not receiving messages
> anymore
>
> 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
>
> > Hi Liquan,
> > thanks for the fast response.
> > I'm able to reproduce the error by having two workers running on two
> > different machines. If I restart one of the two worker, the failover
> logic
> > correctly detects the failure and shut down the tasks on the healthy
> worker
> > for rebalancing. When the failed worker is up again the tasks are
> > distributed correctly among the two workers but some tasks don't get new
> > messages anymore. How can I check that actually all the input topic
> > partitions are correctly reassigned?
> >
> > Matteo
> >
> > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
> >
> >> Hi Matteo,
> >>
> >> Glad to hear that you are building a connector. To better understand the
> >> issue, can you provide the exact steps to re-produce the issue? One
> thing
> >> I
> >> am confused is that when one worker is shutdown, you don't need to
> restart
> >> the connector through the rest API, the failover logic should handle the
> >> connector and tasks shutdown and start up.
> >>
> >> The offset storage topic is used for storing offset for source
> connectors.
> >> For sink connector, the offset is simply Kafka offset and will be stored
> >> in
> >> the __consumer_offset topic.
> >>
> >> Thanks,
> >> Liquan
> >>
> >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <matteo.lu...@gmail.com>
> >> wrote:
> >>
> >> > Hi,
> >> > I'm working on a custom implementation of a sink connector for Kafka
> >> > Connect framework. I'm testing the connector for fault tolerance by
> >> killing
> >> > the worker process  and restarting the connector through the Rest API
> >> and
> >> > occasionally I notice that some tasks don't receive anymore messages
> >> from
> >> > the internal consumers. I don't get any errors from the log and the
> >> tasks
> >> > seem to be initialised correctly but some of them just don't process
> >> > messages anymore. Normally when I restart again the connector, the
> tasks
> >> > read all the messages skipped before. I'm executing Kafka Connect in
> >> > distributed mode.
> >> >
> >> > Could it be a problem of the cleanup function invoked when closing the
> >> > connector causing a leak in consumer connections with the broker? Any
> >> > ideas?
> >> >
> >> > And also, from the documentation I read that the connector save the
> >> offset
> >> > of the tasks in a special topic in Kafka (the one specified via
> >> > offset.storage.topic) but it is empty even though the connector
> process
> >> > messages. Is it normal?
> >> >
> >> > Thanks,
> >> > Matteo
> >> >
> >>
> >>
> >>
> >> --
> >> Liquan Pei
> >> Software Engineer, Confluent Inc
> >>
> >
> >
> >
> > --
> > Matteo Remo Luzzi
> >
>
>
>
> --
> Matteo Remo Luzzi
>



-- 
Liquan Pei
Software Engineer, Confluent Inc

Reply via email to