There have been a couple of rounds of this. Basically a bunch of complaints
people have about the producer boil down to their being no limit on how
long a request will block if the kafka cluster goes hard down. Some of the
discussion was here, I think:
https://issues.apache.org/jira/browse/KAFKA-1788

But a lot was on previous producer-related tickets. E.g. close() blocking
forever if the kafka cluster is down happens because the requests never
fail they just queue indefinitely waiting for kafka to come back.

In any case for the purpose of this KIP we don't need to pick a mechanism
or configuration for controlling client request timeout. All we are saying
now is that we should add such a mechanism and when we do it will address
any concerns about flush() blocking for an inderminate amount of time (so
we don't need any kind of timeout on flush itself now).

-Jay

On Wed, Feb 18, 2015 at 4:24 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> Actually, could you clarify this a bit (since I'm not sure which
> thread you are referring to) - specifically, how would this tie in
> with the current timeout we have for the producer (for example)?
>
> On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote:
> > Yeah there was a separate thread on adding a client-side timeout to
> > requests. We should have this in the new java clients, it just isn't
> there
> > yet. When we do this the flush() call will implicitly have the same
> timeout
> > as the requests (since they will complete or fail by then). I think this
> > makes flush(timeout) and potentially close(timeout) both unnecessary.
> >
> > -Jay
> >
> > On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > In the scala clients we have the socket.timeout config as we are using
> > > blocking IOs, when such timeout is reached the TimeoutException will be
> > > thrown from the socket and the client can handle it accordingly; in the
> > > java clients we are switching to non-blocking IOs and hence we will not
> > > have the socket timeout any more.
> > >
> > > I agree that we could add this client request timeout back in the java
> > > clients, in addition to allowing client / server's non-blocking
> selector to
> > > close idle sockets.
> > >
> > > Guozhang
> > >
> > > On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin
> <j...@linkedin.com.invalid>
> > > wrote:
> > >
> > > > I'm thinking the flush call timeout will naturally be the timeout
> for a
> > > > produce request, No?
> > > >
> > > > Currently it seems we don¹t have a timeout for client requests,
> should we
> > > > have one?
> > > >
> > > > ‹Jiangjie (Becket) Qin
> > > >
> > > > On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> > > >
> > > > >Yes, I think we all agree it would be good to add a client-side
> request
> > > > >timeout. That would effectively imply a flush timeout as well since
> any
> > > > >requests that couldn't complete in that time would be errors and
> hence
> > > > >completed in the definition we gave.
> > > > >
> > > > >-Jay
> > > > >
> > > > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> > > > ><mistry.p.bhav...@gmail.com>
> > > > >wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> Thanks Jay and all  address concern.  I am fine with just having
> > > flush()
> > > > >> method as long as it covers failure mode and resiliency.  e.g We
> had
> > > > >> situation where entire Kafka cluster brokers were reachable, but
> upon
> > > > >> adding new kafka node and admin migrated "leader to new brokers"
> that
> > > > >>new
> > > > >> brokers is NOT reachable from producer stand point due to fire
> wall
> > > but
> > > > >> metadata would continue to elect new broker as leader for that
> > > > >>partition.
> > > > >>
> > > > >> All I am asking is either you will have to give-up sending to this
> > > > >>broker
> > > > >> or do something in this scenario.  As for the current code 0.8.2
> > > > >>release,
> > > > >> caller thread of flush() or close() method would be blocked for
> > > ever....
> > > > >> so all I am asking is
> > > > >>
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1659
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > > >>
> > > > >> Also, I recall that there is timeout also added to batch to
> indicate
> > > how
> > > > >> long "message" can retain in memory before expiring.
> > > > >>
> > > > >> Given,  all this should this API be consistent with others up
> coming
> > > > >> patches for addressing similar problem(s).
> > > > >>
> > > > >>
> > > > >> Otherwise, what we have done is spawn a thread for just calling
> > > close()
> > > > >>or
> > > > >> flush with timeout for join on caller end.
> > > > >>
> > > > >> Anyway, I just wanted to give you issues with existing API and if
> you
> > > > >>guys
> > > > >> think this is fine then, I am ok with this approach. It is just
> that
> > > > >>caller
> > > > >> will have to do bit more work.
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Bhavesh
> > > > >>
> > > > >> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Yes that is a counter-example. I'm okay either way on whether we
> > > > >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > > > >> > explanation a few replies prior address your concern? If so,
> shall
> > > we
> > > > >> > consider this closed?
> > > > >> >
> > > > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > > > >> > > Yeah we could do that, I guess I just feel like it adds
> confusion
> > > > >> because
> > > > >> > > then you have to think about which timeout you want, when
> likely
> > > you
> > > > >> > don't
> > > > >> > > want a timeout at all.
> > > > >> > >
> > > > >> > > I guess the pattern I was thinking of was fflush or the java
> > > > >> equivalent,
> > > > >> > > which don't have timeouts:
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> > > > >>)
> > > > >> > >
> > > > >> > > -Jay
> > > > >> > >
> > > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <
> jjkosh...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > I think tryFlush with a timeout sounds good to me. This is
> > > really
> > > > >> more
> > > > >> > > > for consistency than anything else. I cannot think of any
> > > standard
> > > > >> > > > blocking calls off the top of my head that don't have a
> timed
> > > > >> variant.
> > > > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or
> they
> > > > >> > > > provide an entirely non-blocking mode (e.g.,
> > > socketChannel.connect
> > > > >> > > > followed by finishConnect)
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > >
> > > > >> > > > Joel
> > > > >> > > >
> > > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > > >> > > > > Jay,
> > > > >> > > > >
> > > > >> > > > > The .flush() call seems like it would be the best way if
> you
> > > > >>wanted
> > > > >> > > > to-do a
> > > > >> > > > > clean shutdown of the new producer?
> > > > >> > > > >
> > > > >> > > > > So, you could in your code "stop all incoming requests &&
> > > > >> > > > producer.flush()
> > > > >> > > > > && system.exit(value)" and know pretty much you won't drop
> > > > >>anything
> > > > >> > on
> > > > >> > > > the
> > > > >> > > > > floor.
> > > > >> > > > >
> > > > >> > > > > This can be done with the callbacks and futures (sure) but
> > > > >>.flush()
> > > > >> > seems
> > > > >> > > > > to be the right time to block and a few lines of code, no?
> > > > >> > > > >
> > > > >> > > > > ~ Joestein
> > > > >> > > > >
> > > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> > > > >><jay.kr...@gmail.com>
> > > > >> > wrote:
> > > > >> > > > >
> > > > >> > > > > > Hey Bhavesh,
> > > > >> > > > > >
> > > > >> > > > > > If a broker is not available a new one should be
> elected to
> > > > >>take
> > > > >> > over,
> > > > >> > > > so
> > > > >> > > > > > although the flush might take longer it should still be
> > > quick.
> > > > >> > Even if
> > > > >> > > > not
> > > > >> > > > > > this should result in an error not a hang.
> > > > >> > > > > >
> > > > >> > > > > > The cases you enumerated are all covered already--if the
> > > user
> > > > >> > wants to
> > > > >> > > > > > retry that is covered by the retry setting in the
> client,
> > > for
> > > > >>all
> > > > >> > the
> > > > >> > > > > > errors that is considered completion of the request. The
> > > post
> > > > >> > > > condition of
> > > > >> > > > > > flush isn't that all sends complete successfully, just
> that
> > > > >>they
> > > > >> > > > complete.
> > > > >> > > > > > So if you try to send a message that is too big, when
> flush
> > > > >> returns
> > > > >> > > > calling
> > > > >> > > > > > .get() on the future should not block and should
> produce the
> > > > >> error.
> > > > >> > > > > >
> > > > >> > > > > > Basically the argument I am making is that the only
> reason
> > > you
> > > > >> > want to
> > > > >> > > > call
> > > > >> > > > > > flush() is to guarantee all the sends complete so if it
> > > > >>doesn't
> > > > >> > > > guarantee
> > > > >> > > > > > that it will be somewhat confusing. This does mean
> blocking,
> > > > >>but
> > > > >> > if you
> > > > >> > > > > > don't want to block on the send then you wouldn't call
> > > > >>flush().
> > > > >> > > > > >
> > > > >> > > > > > This has no impact on the block.on.buffer full setting.
> That
> > > > >> > impacts
> > > > >> > > > what
> > > > >> > > > > > happens when send() can't append to the buffer because
> it is
> > > > >> full.
> > > > >> > > > flush()
> > > > >> > > > > > means any message previously sent (i.e. for which send()
> > > call
> > > > >>has
> > > > >> > > > returned)
> > > > >> > > > > > needs to have its request completed. Hope that makes
> sense.
> > > > >> > > > > >
> > > > >> > > > > > -Jay
> > > > >> > > > > >
> > > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > > >> > > > > > mistry.p.bhav...@gmail.com>
> > > > >> > > > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > HI Jay,
> > > > >> > > > > > >
> > > > >> > > > > > > Imagine, if you have flaky network connection to
> brokers,
> > > > >>and
> > > > >> if
> > > > >> > > > flush()
> > > > >> > > > > > > will be blocked if "one of broker is not available" (
> > > > >>basically
> > > > >> > How
> > > > >> > > > would
> > > > >> > > > > > > be address failure mode and io thread not able to
> drain
> > > > >>records
> > > > >> > or
> > > > >> > > > busy
> > > > >> > > > > > due
> > > > >> > > > > > > to pending request". Do you flush() method is only to
> > > flush
> > > > >>to
> > > > >> > in mem
> > > > >> > > > > > queue
> > > > >> > > > > > > or flush to broker over the network().
> > > > >> > > > > > >
> > > > >> > > > > > > Timeout helps with and pushing caller to handle what
> to do
> > > > >>?
> > > > >> > e.g
> > > > >> > > > > > > re-enqueue records, drop entire batch or one of
> message is
> > > > >>too
> > > > >> > big
> > > > >> > > > cross
> > > > >> > > > > > > the limit of max.message.size etc...
> > > > >> > > > > > >
> > > > >> > > > > > > Also, according to java doc for API  "The method will
> > > block
> > > > >> > until all
> > > > >> > > > > > > previously sent records have completed sending (either
> > > > >> > successfully
> > > > >> > > > or
> > > > >> > > > > > with
> > > > >> > > > > > > an error)", does this by-pass rule set by for
> > > > >> > block.on.buffer.full or
> > > > >> > > > > > > batch.size
> > > > >> > > > > > > when under load.
> > > > >> > > > > > >
> > > > >> > > > > > > That was my intention, and I am sorry I mixed-up
> close()
> > > > >>method
> > > > >> > here
> > > > >> > > > > > > without knowing that this is only for bulk send.
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > >
> > > > >> > > > > > > Bhavesh
> > > > >> > > > > > >
> > > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> > > > >><jay.kr...@gmail.com
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> > > > >>flush a
> > > > >> > > > timeout.
> > > > >> > > > > > In
> > > > >> > > > > > > > general failover in Kafka is a bounded thing unless
> you
> > > > >>have
> > > > >> > > > brought
> > > > >> > > > > > your
> > > > >> > > > > > > > Kafka cluster down entirely so I think depending on
> that
> > > > >> bound
> > > > >> > > > > > implicitly
> > > > >> > > > > > > > is okay.
> > > > >> > > > > > > >
> > > > >> > > > > > > > It is possible to make flush() be instead
> > > > >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > > >> > > > > > > >
> > > > >> > > > > > > > But I am somewhat skeptical that people will use
> this
> > > > >> > correctly.
> > > > >> > > > I.e
> > > > >> > > > > > > > consider the mirror maker code snippet I gave
> above, how
> > > > >> would
> > > > >> > one
> > > > >> > > > > > > actually
> > > > >> > > > > > > > recover in this case other than retrying (which the
> > > client
> > > > >> > already
> > > > >> > > > does
> > > > >> > > > > > > > automatically)? After all if you are okay losing
> data
> > > then
> > > > >> you
> > > > >> > > > don't
> > > > >> > > > > > need
> > > > >> > > > > > > > to bother calling flush at all, you can just let the
> > > > >>messages
> > > > >> > be
> > > > >> > > > sent
> > > > >> > > > > > > > asynchronously.
> > > > >> > > > > > > >
> > > > >> > > > > > > > I think close() is actually different because you
> may
> > > well
> > > > >> > want to
> > > > >> > > > > > > shutdown
> > > > >> > > > > > > > immediately and just throw away unsent events.
> > > > >> > > > > > > >
> > > > >> > > > > > > > -Jay
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > > > >> > wangg...@gmail.com>
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > The proposal looks good to me, will need some
> time to
> > > > >> review
> > > > >> > the
> > > > >> > > > > > > > > implementation RB later.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush()
> > > with
> > > > >>a
> > > > >> > timeout
> > > > >> > > > > > since
> > > > >> > > > > > > > > such a call does not actually provide any flushing
> > > > >> > guarantees?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > As for close(), there is a separate JIRA for this:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > KAFKA-1660 <
> > > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > > >> > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > > >> > > > > > > > mistry.p.bhav...@gmail.com
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Hi Jay,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > How about adding timeout for each method calls
> > > > >> > > > > > > flush(timeout,TimeUnit)
> > > > >> > > > > > > > > and
> > > > >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io
> thread
> > > > >>issue
> > > > >> > and
> > > > >> > > > caller
> > > > >> > > > > > > > > thread
> > > > >> > > > > > > > > > should not blocked for ever for these methods ?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Thanks,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Bhavesh
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > > >> > > > jay.kr...@gmail.com>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0
> the
> > > send
> > > > >>is
> > > > >> > still
> > > > >> > > > > > > > > > asynchronous
> > > > >> > > > > > > > > > > so calling flush() blocks until all the
> previously
> > > > >>sent
> > > > >> > > > records
> > > > >> > > > > > > have
> > > > >> > > > > > > > > > > completed. It doesn't speed anything up in
> that
> > > > >>case,
> > > > >> > though,
> > > > >> > > > > > since
> > > > >> > > > > > > > > they
> > > > >> > > > > > > > > > > are already available to send.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira
> <
> > > > >> > > > > > > gshap...@cloudera.com
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Looks good to me.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > I like the idea of not blocking additional
> sends
> > > > >>but
> > > > >> > not
> > > > >> > > > > > > > guaranteeing
> > > > >> > > > > > > > > > > that
> > > > >> > > > > > > > > > > > flush() will deliver them.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush
> will
> > > just
> > > > >> be a
> > > > >> > > > noop
> > > > >> > > > > > > (since
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > > queue will be empty). Is that correct?
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Gwen
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > > >> > > > > > jay.kr...@gmail.com>
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Following up on our previous thread on
> making
> > > > >>batch
> > > > >> > send
> > > > >> > > > a
> > > > >> > > > > > > little
> > > > >> > > > > > > > > > > easier,
> > > > >> > > > > > > > > > > > > here is a concrete proposal to add a
> flush()
> > > > >>method
> > > > >> > to
> > > > >> > > > the
> > > > >> > > > > > > > > producer:
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >> >
> > > > >>
> > > > >>
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> > > > >>hod+to+the+producer+API
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > A proposed implementation is here:
> > > > >> > > > > > > > > > > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > Thoughts?
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > > > -Jay
> > > > >> > > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > --
> > > > >> > > > > > > > > -- Guozhang
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > >
> > > > >> > > >
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>
>

Reply via email to