Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-03 Thread Colin McCabe
On Wed, May 2, 2018, at 14:54, John Roesler wrote: > Thanks Jason, > > I did find some production use cases "on the internet" that use poll(0) > *just* to join the group initially and ignore the response. I suppose the > assumption is that it'll be empty on the very first call to poll with > timeo

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-03 Thread John Roesler
Hey Richard, I've updated the KIP with the changes discussed here. If you're happy with the state of it, I think that we're probably ready to call for a vote. However, I'm about to take a week off for vacation. Do you mind sending the [VOTE] message and managing the vote thread? Obviously, you c

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-02 Thread Jason Gustafson
Hey John, Yeah, I appreciate Becket's point. We do tend to abuse the initial intent of ApiException. It's just that it can be awkward to come up with another name when the ApiException already has a reasonable and appropriate name for the user API. `ClientTimeoutException` is a perfect example of

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-02 Thread Richard Yu
Hi John, I don't have any objections to this KIP change. Please go ahead. Thanks, Richard On Wed, May 2, 2018 at 2:54 PM, John Roesler wrote: > Thanks Jason, > > I did find some production use cases "on the internet" that use poll(0) > *just* to join the group initially and ignore the response

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-02 Thread John Roesler
Thanks Jason, I did find some production use cases "on the internet" that use poll(0) *just* to join the group initially and ignore the response. I suppose the assumption is that it'll be empty on the very first call to poll with timeout=0. In my opinion, this usage is unsafe, since there's a decl

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-05-02 Thread Jason Gustafson
I think John's proposal look reasonable to me. My only doubt is about use cases for the new `awaitAssignmentMetadata` API. I think the basic idea is that we want a way to block until we have joined the consumer group, but we do not want to await fetched data. Maybe another way to accomplish this wo

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-25 Thread Guozhang Wang
Previously there are some debates on whether we should add this nonblocking behavior via a config v.s. via overloaded functions. To make progress on this discussion we need to first figure that part out. I'm in favor of the current approach of overloaded functions over the config since if we are go

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-25 Thread John Roesler
Hi Richard, I've updated the KIP with my proposed replacement methods for the poll use cases. Since I'm no longer proposing to alter poll(long) in place, I don't think it's necessary to ever through a ClientTimeoutException from poll(Duration). The new method, awaitAssignmentMetadata, however wou

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-25 Thread John Roesler
Re Ted's last comment, that style of async API requires some thread to actually drive the request/response cycle and invoke the callback when it's complete. Right now, this happens in the caller's thread as a side-effect of calling poll(). But that clearly won't work for poll() itself! In the futu

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-19 Thread John Roesler
Thanks for the tip, Ted! On Thu, Apr 19, 2018 at 12:12 PM, Ted Yu wrote: > John: > In case you want to pursue async poll, it seems (by looking at current API) > that introducing PollCallback follows existing pattern(s). > > e.g. KafkaConsumer#commitAsync(OffsetCommitCallback) > > FYI > > On Thu,

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-19 Thread Ted Yu
John: In case you want to pursue async poll, it seems (by looking at current API) that introducing PollCallback follows existing pattern(s). e.g. KafkaConsumer#commitAsync(OffsetCommitCallback) FYI On Thu, Apr 19, 2018 at 10:08 AM, John Roesler wrote: > Hi Richard, > > Thanks for the invitatio

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-19 Thread John Roesler
Hi Richard, Thanks for the invitation! I do think it would be safer to introduce a new poll method than to change the semantics of the old one. I've been mulling about whether the new one could still have (slightly different) async semantics with a timeout of 0. If possible, I'd like to avoid intr

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread Richard Yu
Hi John, Do you have a preference for fixing the poll() method (e.g. using asyncPoll or just sticking with the current method but with an extra timeout parameter) ? I think your current proposition for KIP-288 is better than what I have on my side. If you think there is something that you want to

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread John Roesler
Ok, I'll close the discussion on KIP-288 and mark it discarded. We can solidify the design for poll in KIP-266, and once it's approved, I'll coordinate with Qiang Zhao on the PR for the poll part of the work. Once that is merged, you'll have a clean slate for the rest of the work. On Tue, Apr 17,

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread Richard Yu
Hi John, I think that you could finish your PR that corresponds with KIP-288 and merge it. I can finish my side of the work afterwards. On another note, adding an asynchronized version of poll() would make sense, particularily since the current version of Kafka does not support it. Thanks Richar

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread John Roesler
Cross-pollinating from some discussion we've had on KIP-288, I think there's a good reason that poll() takes a timeout when none of the other methods do, and it's relevant to this discussion. The timeout in poll() is effectively implementing a long-poll API (on the client side, so it's not really

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread John Roesler
Hey Richard, As you noticed, the newly introduced KIP-288 overlaps with this one. Sorry for stepping on your toes... How would you like to proceed? I'm happy to "close" KIP-288 in deference to this KIP. With respect to poll(), reading this discussion gave me a new idea for providing a non-breakin

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-17 Thread Richard Yu
Hi all, If possible, would a committer please review? Thanks On Sun, Apr 1, 2018 at 7:24 PM, Richard Yu wrote: > Hi Guozhang, > > I have clarified the KIP a bit to account for Becket's suggestion on > ClientTimeoutException. > About adding an extra config, you were right about my intentions. I

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-01 Thread Richard Yu
Hi Guozhang, I have clarified the KIP a bit to account for Becket's suggestion on ClientTimeoutException. About adding an extra config, you were right about my intentions. I am just wondering if the config should be included, since Ismael seems to favor an extra configuration, Thanks, Richard On

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-04-01 Thread Guozhang Wang
Hi Richard, Regarding the streams side changes, we plan to incorporate with the new APIs once the KIP is done, which is only internal code changes and hence do not need to include in the KIP. Could you update the KIP because it has been quite obsoleted from the discussed topics, and I'm a bit loo

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-30 Thread Richard Yu
On a side note, I have noticed that the several other methods in classes such as StoreChangeLogReader in Streams calls position() which causes tests to hang. It might be out of the scope of the KIP, but should I also change the methods which use position() as a callback to at the very least prevent

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-30 Thread Richard Yu
Thanks for the review Becket. About the methods beginningOffsets(), endOffsets(), ...: I took a look through the code of KafkaConsumer, but after looking through the offsetsByTimes() method and its callbacks in Fetcher, I think these methods already block for a set period of time. I know that ther

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-30 Thread Becket Qin
Glad to see the KIP, Richard. This has been a really long pending issue. The original arguments from Jay for using config, such as max.block.ms, instead of using timeout parameters was that people will always hard code the timeout, and the hard coded timeout is rarely correct because it has to con

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-26 Thread Guozhang Wang
@Richard: TimeoutException inherits from RetriableException which inherits from ApiException. So users should explicitly try to capture RetriableException in their code and handle the exception. @Isamel, Ewen: I'm trying to push progress forward on this one, are we now on the same page for using f

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-23 Thread Ismael Juma
Hi Ewen, Yeah, I mentioned KAFKA-2391 where some of this was discussed. Jay was against having timeouts in the methods at the time. However, as Jason said offline, we did end up with a timeout parameter in `poll`. Ismael On Fri, Mar 23, 2018 at 4:26 PM, Ewen Cheslack-Postava wrote: > Regarding

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-23 Thread Ewen Cheslack-Postava
Regarding the flexibility question, has someone tried to dig up the discussion of the new consumer APIs when they were being written? I vaguely recall these exact questions about using APIs vs configs and flexibility vs bloating the API surface area having already been discussed. (Not that we shoul

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-22 Thread Richard Yu
I do have one question though: in the current KIP, throwing TimeoutException to mark that time limit is exceeded is applied to all new methods introduced in this proposal. However, how would users respond when a TimeoutException (since it is considered a RuntimeException)? Thanks, Richard On Mo

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Richard Yu
Hi Ismael, You have a great point. Since most of the methods in this KIP have similar callbacks (position() and committed() both use fetchCommittedOffsets(), and commitSync() is similar to position(), except just updating offsets), the amount of time they block should be also about equal. However

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Ismael Juma
Hi, An option that is not currently covered in the KIP is to have a separate config max.block.ms, which is similar to the producer config with the same name. This came up during the KAFKA-2391 discussion. I think it's clear that we can't rely on request.timeout.ms, so the decision is between addin

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Richard Yu
Hi Guozhang, I made some clarifications to KIP-266, namely: 1. Stated more specifically that commitSync will accept user input. 2. fetchCommittedOffsets(): Made its role in blocking more clear to the reader. 3. Sketched what would happen when time limit is exceeded. These changes should make the

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-19 Thread Guozhang Wang
Hi Richard, I made a pass over the KIP again, some more clarifications / comments: 1. seek() call itself is not blocking, only the following poll() call may be blocking as the actually metadata rq will happen. 2. I saw you did not include Consumer.partitionFor(), Consumer.OffsetAndTimestamp() an

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-17 Thread Richard Yu
Actually, what I said above is inaccurate. In testSeekAndCommitWithBrokerFailures, TestUtils.waitUntilTrue blocks, not seek. My assumption is that seek did not update correctly. I will be digging further into this. On Sat, Mar 17, 2018 at 4:16 PM, Richard Yu wrote: > One more thing: when looki

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-17 Thread Richard Yu
One more thing: when looking through tests, I have realized that seek() methods can potentially block indefinitely. As you well know, seek() is called when pollOnce() or position() is active. Thus, if position() blocks indefinitely, then so would seek(). Should bounding seek() also be included in t

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-17 Thread Richard Yu
Thanks for the advice, Jason I have modified KIP-266 to include the java doc for committed() and other blocking methods, and I also mentioned poll() which will also be bounded. Let me know if there is anything else. :) Sincerely, Richard On Sat, Mar 17, 2018 at 12:00 PM, Jason Gustafson wro

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-17 Thread Jason Gustafson
Hi Richard, Thanks for the updates. I'm really glad you picked this up. A couple minor comments: 1. Can you list the full set of new APIs explicitly in the KIP? Currently I only see the javadoc for `position()`. 2. We should consider adding `TimeUnit` to the new methods to avoid unit confusion.

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-14 Thread Richard Yu
Note to all: I have included bounding commitSync() and committed() in this KIP. On Sun, Mar 11, 2018 at 5:05 PM, Richard Yu wrote: > Hi all, > > I updated the KIP where overloading position() is now the favored approach. > Bounding position() using requestTimeoutMs has been listed as rejected. >

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-11 Thread Richard Yu
Hi all, I updated the KIP where overloading position() is now the favored approach. Bounding position() using requestTimeoutMs has been listed as rejected. Any thoughts? On Tue, Mar 6, 2018 at 6:00 PM, Guozhang Wang wrote: > I agree that adding the overloads is most flexible. But going for tha

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-06 Thread Guozhang Wang
I agree that adding the overloads is most flexible. But going for that direction we'd do that for all the blocking call that I've listed above, with this timeout value covering the end-to-end waiting time. Guozhang On Tue, Mar 6, 2018 at 10:02 AM, Ted Yu wrote: > bq. The most flexible option i

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-06 Thread Ted Yu
bq. The most flexible option is to add overloads to the consumer This option is flexible. Looking at the tail of SPARK-18057, Spark dev voiced the same choice. +1 for adding overload with timeout parameter. Cheers On Mon, Mar 5, 2018 at 2:42 PM, Jason Gustafson wrote: > @Guozhang I probably

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-05 Thread Jason Gustafson
@Guozhang I probably have suggested all options at some point or another, including most recently, the current KIP! I was thinking that practically speaking, the request timeout defines how long the user is willing to wait for a response. The consumer doesn't really have a complex send process like

Re: [DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-04 Thread Guozhang Wang
Hello Richard, Thanks for the proposed KIP. I have a couple of general comments: 1. I'm not sure if piggy-backing the timeout exception on the existing requestTimeoutMs configured in "request.timeout.ms" is a good idea since a) it is a general config that applies for all types of requests, and 2)

[DISCUSSION] KIP-266: Add TimeoutException to KafkaConsumer#position()

2018-03-04 Thread Richard Yu
Hi all, I would like to discuss a potential change which would be made to KafkaConsumer: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 Thanks, Richard Yu