[
https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14555396#comment-14555396
]
Guozhang Wang commented on KAFKA-2168:
--------------------------------------
Currently the NetworkClient's blocking calls: "poll" and "completeAll" (which
triggers "poll" as well) are used in different layers as for:
1. KafkaConsumer.poll() naturally triggers client.poll().
2. Coordinator.ensureCoordinatorReady() to find a node to ask for consumer
coordinator;
3. Coordinator.sendAndReceive() for blocking requests as sync commit offsets,
fetch offsets, join group, and fetch coordinator.
4. Fetcher.awaitMetadataUpdate() for blocking on metadata update.
5. Fetcher.offsetBefore() to blocking until successfully get list-offset
responses.
Besides 1), all other four uses of NetworkClient's blocking calls are actually
breaking the API declaration that consumer.poll(timeout) will block for as long
as timeout period (for 3) actually sync commit offsets is OK since it is by
definition blocking calls). In addition, 4) is a duplicate function as there is
another KafkaConsumer.awaitMetadataUpdate.
So maybe we could consider combing the fix of KAFKA-1894 together and see if
the following is possible:
1. Move all "poll" and "completeAll" from Coordinator and Fetcher into
KafkaConsumer to make sure none of Coordiantor / Fetcher functions are
blocking. For example, if the coordinator is not known, instead of blocking on
ensureCoordinatorReady we should just call
client.send(consumer-metadata-request) and return, and depend on the callback
to handle coordinator discovery. This is mainly for KAFKA-1894 and will of
course complicated the logic of Coordinator and Fetcher since now they need to
maintain some more internal state.
2. Move Fetcher.resetOffset/OffsetBefore into subscription, and remove
Fetcher.awaitMetadataUpdate. After this metadata becomes read-only to fetcher.
3. After the first two steps the finer synchronization approach would be
simpler since now we only need to synchronize on subscriptions.
A side-effect though would be that calls to consumer.poll(timemout) may now
returns much quicker with no returned data under partition re-assignment / sync
offset commit, etc, and people need to call it multiple times before getting
data. But as long as we document the usage clear I think this is fine.
> New consumer poll() can block other calls like position(), commit(), and
> close() indefinitely
> ---------------------------------------------------------------------------------------------
>
> Key: KAFKA-2168
> URL: https://issues.apache.org/jira/browse/KAFKA-2168
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Ewen Cheslack-Postava
> Assignee: Jason Gustafson
>
> The new consumer is currently using very coarse-grained synchronization. For
> most methods this isn't a problem since they finish quickly once the lock is
> acquired, but poll() might run for a long time (and commonly will since
> polling with long timeouts is a normal use case). This means any operations
> invoked from another thread may block until the poll() call completes.
> Some example use cases where this can be a problem:
> * A shutdown hook is registered to trigger shutdown and invokes close(). It
> gets invoked from another thread and blocks indefinitely.
> * User wants to manage offset commit themselves in a background thread. If
> the commit policy is not purely time based, it's not currently possibly to
> make sure the call to commit() will be processed promptly.
> Two possible solutions to this:
> 1. Make sure a lock is not held during the actual select call. Since we have
> multiple layers (KafkaConsumer -> NetworkClient -> Selector -> nio Selector)
> this is probably hard to make work cleanly since locking is currently only
> performed at the KafkaConsumer level and we'd want it unlocked around a
> single line of code in Selector.
> 2. Wake up the selector before synchronizing for certain operations. This
> would require some additional coordination to make sure the caller of
> wakeup() is able to acquire the lock promptly (instead of, e.g., the poll()
> thread being woken up and then promptly reacquiring the lock with a
> subsequent long poll() call).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)