Hi Matteo,

There was a bug in the 0.9.1 such that task.close() can be invoked both in
the Worker thread and Herder thread. There can be a race condition that
consumer.close() is invoked in multiple threads at the same time. As the
consumer is designed to be used in single thread, thus the concurrent
modification exception is thrown if consumer.close() method is invoked by
multiple threads.

This bug is fixed in the coming 0.10.0 release and consumer.close() is only
invoked in the SinkTaskThread.

Thanks,
Liquan

On Sun, May 15, 2016 at 11:57 PM, Matteo Luzzi <matteo.lu...@gmail.com>
wrote:

> Any other thoughts on this?
> Thanks,
> Matteo
>
> 2016-05-12 13:09 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
>
> > I found also this suspicious log snippet that might be revelant. The task
> > executed by thread 134 is the one that won't receive messages
> >
> > INFO Attempt to heart beat failed since the group is rebalancing, try to
> > re-join group.
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:633)
> > [2016-05-12 10:27:09,623] INFO
> > [kafka-connect-topic-0, kafka-connect-topic-1, kafka-connect-topic-2,
> kafka-connect-topic-3, kafka-connect-topic-4, kafka-connect-topic-5]
> > topic-partitions are revoked
> > from this thread task 134
> (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:99)
> > [2016-05-12 10:27:09,623] INFO
> > org.apache.kafka.connect.runtime.WorkerSinkTask@5c29784c Committing
> > offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
> > [2016-05-12 10:27:09,634] INFO [kafka-connect-topic-0] topic-partitions
> > are assigned from this thread task 134
> > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:92)
> >
> > 2016-05-12 8:58 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
> >
> >> Hi, Liquan
> >>
> >> I run the two workers inside docker containers and a connector having 6
> >> tasks. They read from a topic having 6 partitions Then I kill one of the
> >> two containers using docker kill or docker restart command. When the
> >> container is up again a rebalance happens and sometimes few tasks don't
> >> consume messages anymore even thought the onPartitionAssigned functions
> >> says that they are handling a partition of the topic. Let me know if you
> >> need other information
> >> I use Kafka 0.9.0.
> >>
> >> Thanks for the help,
> >> Matteo
> >>
> >> 2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
> >>
> >>> Hi Matteo,
> >>>
> >>> I am not completely follow the steps.  Can you share the exact command
> to
> >>> reproduce the issue? What kind of commands did you use to restart the
> >>> connector? Which version of Kafka are you using?
> >>>
> >>> Thanks,
> >>> Liquan
> >>>
> >>> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi again, I was able to reproduce the bug in the same scenario (two
> >>> workers
> >>> > on separate machines) just by deleting the connector from the Rest
> API
> >>> and
> >>> > then restarting it again.
> >>> > I also got this error on one of the workers :
> >>> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45
> >>> +0000]
> >>> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
> >>> > (org.apache.kafka.connect.runtime.rest.RestServer:60)
> >>> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
> >>> > WorkerSinkTask-kafka-sink-connector-1
> >>> > (org.apache.kafka.connect.util.ShutdownableThread:141)
> >>> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task
> >>> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
> >>> > (org.apache.kafka.connect.runtime.Worker:312)
> >>> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work
> >>> thread,
> >>> > exiting:
> >>> >  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
> >>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
> >>> for
> >>> > multi-threaded access
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
> >>> > at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
> >>> > at
> >>> >
> >>> >
> >>>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
> >>> > at java.lang.Thread.run(Thread.java:745)
> >>> >
> >>> > On the successive restart, 2 out of 6 tasks were not receiving
> messages
> >>> > anymore
> >>> >
> >>> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
> >>> >
> >>> > > Hi Liquan,
> >>> > > thanks for the fast response.
> >>> > > I'm able to reproduce the error by having two workers running on
> two
> >>> > > different machines. If I restart one of the two worker, the
> failover
> >>> > logic
> >>> > > correctly detects the failure and shut down the tasks on the
> healthy
> >>> > worker
> >>> > > for rebalancing. When the failed worker is up again the tasks are
> >>> > > distributed correctly among the two workers but some tasks don't
> get
> >>> new
> >>> > > messages anymore. How can I check that actually all the input topic
> >>> > > partitions are correctly reassigned?
> >>> > >
> >>> > > Matteo
> >>> > >
> >>> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
> >>> > >
> >>> > >> Hi Matteo,
> >>> > >>
> >>> > >> Glad to hear that you are building a connector. To better
> >>> understand the
> >>> > >> issue, can you provide the exact steps to re-produce the issue?
> One
> >>> > thing
> >>> > >> I
> >>> > >> am confused is that when one worker is shutdown, you don't need to
> >>> > restart
> >>> > >> the connector through the rest API, the failover logic should
> >>> handle the
> >>> > >> connector and tasks shutdown and start up.
> >>> > >>
> >>> > >> The offset storage topic is used for storing offset for source
> >>> > connectors.
> >>> > >> For sink connector, the offset is simply Kafka offset and will be
> >>> stored
> >>> > >> in
> >>> > >> the __consumer_offset topic.
> >>> > >>
> >>> > >> Thanks,
> >>> > >> Liquan
> >>> > >>
> >>> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <
> >>> matteo.lu...@gmail.com>
> >>> > >> wrote:
> >>> > >>
> >>> > >> > Hi,
> >>> > >> > I'm working on a custom implementation of a sink connector for
> >>> Kafka
> >>> > >> > Connect framework. I'm testing the connector for fault tolerance
> >>> by
> >>> > >> killing
> >>> > >> > the worker process  and restarting the connector through the
> Rest
> >>> API
> >>> > >> and
> >>> > >> > occasionally I notice that some tasks don't receive anymore
> >>> messages
> >>> > >> from
> >>> > >> > the internal consumers. I don't get any errors from the log and
> >>> the
> >>> > >> tasks
> >>> > >> > seem to be initialised correctly but some of them just don't
> >>> process
> >>> > >> > messages anymore. Normally when I restart again the connector,
> the
> >>> > tasks
> >>> > >> > read all the messages skipped before. I'm executing Kafka
> Connect
> >>> in
> >>> > >> > distributed mode.
> >>> > >> >
> >>> > >> > Could it be a problem of the cleanup function invoked when
> >>> closing the
> >>> > >> > connector causing a leak in consumer connections with the
> broker?
> >>> Any
> >>> > >> > ideas?
> >>> > >> >
> >>> > >> > And also, from the documentation I read that the connector save
> >>> the
> >>> > >> offset
> >>> > >> > of the tasks in a special topic in Kafka (the one specified via
> >>> > >> > offset.storage.topic) but it is empty even though the connector
> >>> > process
> >>> > >> > messages. Is it normal?
> >>> > >> >
> >>> > >> > Thanks,
> >>> > >> > Matteo
> >>> > >> >
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> --
> >>> > >> Liquan Pei
> >>> > >> Software Engineer, Confluent Inc
> >>> > >>
> >>> > >
> >>> > >
> >>> > >
> >>> > > --
> >>> > > Matteo Remo Luzzi
> >>> > >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Matteo Remo Luzzi
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> Liquan Pei
> >>> Software Engineer, Confluent Inc
> >>>
> >>
> >>
> >>
> >> --
> >> Matteo Remo Luzzi
> >>
> >
> >
> >
> > --
> > Matteo Remo Luzzi
> >
>
>
>
> --
> Matteo Remo Luzzi
>



-- 
Liquan Pei
Software Engineer, Confluent Inc

Reply via email to