Actually, could you clarify this a bit (since I'm not sure which
thread you are referring to) - specifically, how would this tie in
with the current timeout we have for the producer (for example)?

On Tue, Feb 17, 2015 at 02:55:44PM -0800, Jay Kreps wrote:
> Yeah there was a separate thread on adding a client-side timeout to
> requests. We should have this in the new java clients, it just isn't there
> yet. When we do this the flush() call will implicitly have the same timeout
> as the requests (since they will complete or fail by then). I think this
> makes flush(timeout) and potentially close(timeout) both unnecessary.
> 
> -Jay
> 
> On Tue, Feb 17, 2015 at 2:44 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> > In the scala clients we have the socket.timeout config as we are using
> > blocking IOs, when such timeout is reached the TimeoutException will be
> > thrown from the socket and the client can handle it accordingly; in the
> > java clients we are switching to non-blocking IOs and hence we will not
> > have the socket timeout any more.
> >
> > I agree that we could add this client request timeout back in the java
> > clients, in addition to allowing client / server's non-blocking selector to
> > close idle sockets.
> >
> > Guozhang
> >
> > On Tue, Feb 17, 2015 at 1:55 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> > wrote:
> >
> > > I'm thinking the flush call timeout will naturally be the timeout for a
> > > produce request, No?
> > >
> > > Currently it seems we don¹t have a timeout for client requests, should we
> > > have one?
> > >
> > > ‹Jiangjie (Becket) Qin
> > >
> > > On 2/16/15, 8:19 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> > >
> > > >Yes, I think we all agree it would be good to add a client-side request
> > > >timeout. That would effectively imply a flush timeout as well since any
> > > >requests that couldn't complete in that time would be errors and hence
> > > >completed in the definition we gave.
> > > >
> > > >-Jay
> > > >
> > > >On Mon, Feb 16, 2015 at 7:57 PM, Bhavesh Mistry
> > > ><mistry.p.bhav...@gmail.com>
> > > >wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> Thanks Jay and all  address concern.  I am fine with just having
> > flush()
> > > >> method as long as it covers failure mode and resiliency.  e.g We had
> > > >> situation where entire Kafka cluster brokers were reachable, but upon
> > > >> adding new kafka node and admin migrated "leader to new brokers"  that
> > > >>new
> > > >> brokers is NOT reachable from producer stand point due to fire wall
> > but
> > > >> metadata would continue to elect new broker as leader for that
> > > >>partition.
> > > >>
> > > >> All I am asking is either you will have to give-up sending to this
> > > >>broker
> > > >> or do something in this scenario.  As for the current code 0.8.2
> > > >>release,
> > > >> caller thread of flush() or close() method would be blocked for
> > ever....
> > > >> so all I am asking is
> > > >>
> > > >> https://issues.apache.org/jira/browse/KAFKA-1659
> > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > >>
> > > >> Also, I recall that there is timeout also added to batch to indicate
> > how
> > > >> long "message" can retain in memory before expiring.
> > > >>
> > > >> Given,  all this should this API be consistent with others up coming
> > > >> patches for addressing similar problem(s).
> > > >>
> > > >>
> > > >> Otherwise, what we have done is spawn a thread for just calling
> > close()
> > > >>or
> > > >> flush with timeout for join on caller end.
> > > >>
> > > >> Anyway, I just wanted to give you issues with existing API and if you
> > > >>guys
> > > >> think this is fine then, I am ok with this approach. It is just that
> > > >>caller
> > > >> will have to do bit more work.
> > > >>
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Bhavesh
> > > >>
> > > >> On Thursday, February 12, 2015, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > > >>
> > > >> > Yes that is a counter-example. I'm okay either way on whether we
> > > >> > should have just flush() or have a timeout. Bhavesh, does Jay's
> > > >> > explanation a few replies prior address your concern? If so, shall
> > we
> > > >> > consider this closed?
> > > >> >
> > > >> > On Tue, Feb 10, 2015 at 01:36:23PM -0800, Jay Kreps wrote:
> > > >> > > Yeah we could do that, I guess I just feel like it adds confusion
> > > >> because
> > > >> > > then you have to think about which timeout you want, when likely
> > you
> > > >> > don't
> > > >> > > want a timeout at all.
> > > >> > >
> > > >> > > I guess the pattern I was thinking of was fflush or the java
> > > >> equivalent,
> > > >> > > which don't have timeouts:
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > >
> > http://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#flush(
> > > >>)
> > > >> > >
> > > >> > > -Jay
> > > >> > >
> > > >> > > On Tue, Feb 10, 2015 at 10:41 AM, Joel Koshy <jjkosh...@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > I think tryFlush with a timeout sounds good to me. This is
> > really
> > > >> more
> > > >> > > > for consistency than anything else. I cannot think of any
> > standard
> > > >> > > > blocking calls off the top of my head that don't have a timed
> > > >> variant.
> > > >> > > > E.g., Thread.join, Object.wait, Future.get Either that, or they
> > > >> > > > provide an entirely non-blocking mode (e.g.,
> > socketChannel.connect
> > > >> > > > followed by finishConnect)
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > >
> > > >> > > > Joel
> > > >> > > >
> > > >> > > > On Tue, Feb 10, 2015 at 11:30:47AM -0500, Joe Stein wrote:
> > > >> > > > > Jay,
> > > >> > > > >
> > > >> > > > > The .flush() call seems like it would be the best way if you
> > > >>wanted
> > > >> > > > to-do a
> > > >> > > > > clean shutdown of the new producer?
> > > >> > > > >
> > > >> > > > > So, you could in your code "stop all incoming requests &&
> > > >> > > > producer.flush()
> > > >> > > > > && system.exit(value)" and know pretty much you won't drop
> > > >>anything
> > > >> > on
> > > >> > > > the
> > > >> > > > > floor.
> > > >> > > > >
> > > >> > > > > This can be done with the callbacks and futures (sure) but
> > > >>.flush()
> > > >> > seems
> > > >> > > > > to be the right time to block and a few lines of code, no?
> > > >> > > > >
> > > >> > > > > ~ Joestein
> > > >> > > > >
> > > >> > > > > On Tue, Feb 10, 2015 at 11:25 AM, Jay Kreps
> > > >><jay.kr...@gmail.com>
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Hey Bhavesh,
> > > >> > > > > >
> > > >> > > > > > If a broker is not available a new one should be elected to
> > > >>take
> > > >> > over,
> > > >> > > > so
> > > >> > > > > > although the flush might take longer it should still be
> > quick.
> > > >> > Even if
> > > >> > > > not
> > > >> > > > > > this should result in an error not a hang.
> > > >> > > > > >
> > > >> > > > > > The cases you enumerated are all covered already--if the
> > user
> > > >> > wants to
> > > >> > > > > > retry that is covered by the retry setting in the client,
> > for
> > > >>all
> > > >> > the
> > > >> > > > > > errors that is considered completion of the request. The
> > post
> > > >> > > > condition of
> > > >> > > > > > flush isn't that all sends complete successfully, just that
> > > >>they
> > > >> > > > complete.
> > > >> > > > > > So if you try to send a message that is too big, when flush
> > > >> returns
> > > >> > > > calling
> > > >> > > > > > .get() on the future should not block and should produce the
> > > >> error.
> > > >> > > > > >
> > > >> > > > > > Basically the argument I am making is that the only reason
> > you
> > > >> > want to
> > > >> > > > call
> > > >> > > > > > flush() is to guarantee all the sends complete so if it
> > > >>doesn't
> > > >> > > > guarantee
> > > >> > > > > > that it will be somewhat confusing. This does mean blocking,
> > > >>but
> > > >> > if you
> > > >> > > > > > don't want to block on the send then you wouldn't call
> > > >>flush().
> > > >> > > > > >
> > > >> > > > > > This has no impact on the block.on.buffer full setting. That
> > > >> > impacts
> > > >> > > > what
> > > >> > > > > > happens when send() can't append to the buffer because it is
> > > >> full.
> > > >> > > > flush()
> > > >> > > > > > means any message previously sent (i.e. for which send()
> > call
> > > >>has
> > > >> > > > returned)
> > > >> > > > > > needs to have its request completed. Hope that makes sense.
> > > >> > > > > >
> > > >> > > > > > -Jay
> > > >> > > > > >
> > > >> > > > > > On Mon, Feb 9, 2015 at 11:52 PM, Bhavesh Mistry <
> > > >> > > > > > mistry.p.bhav...@gmail.com>
> > > >> > > > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > HI Jay,
> > > >> > > > > > >
> > > >> > > > > > > Imagine, if you have flaky network connection to brokers,
> > > >>and
> > > >> if
> > > >> > > > flush()
> > > >> > > > > > > will be blocked if "one of broker is not available" (
> > > >>basically
> > > >> > How
> > > >> > > > would
> > > >> > > > > > > be address failure mode and io thread not able to drain
> > > >>records
> > > >> > or
> > > >> > > > busy
> > > >> > > > > > due
> > > >> > > > > > > to pending request". Do you flush() method is only to
> > flush
> > > >>to
> > > >> > in mem
> > > >> > > > > > queue
> > > >> > > > > > > or flush to broker over the network().
> > > >> > > > > > >
> > > >> > > > > > > Timeout helps with and pushing caller to handle what to do
> > > >>?
> > > >> > e.g
> > > >> > > > > > > re-enqueue records, drop entire batch or one of message is
> > > >>too
> > > >> > big
> > > >> > > > cross
> > > >> > > > > > > the limit of max.message.size etc...
> > > >> > > > > > >
> > > >> > > > > > > Also, according to java doc for API  "The method will
> > block
> > > >> > until all
> > > >> > > > > > > previously sent records have completed sending (either
> > > >> > successfully
> > > >> > > > or
> > > >> > > > > > with
> > > >> > > > > > > an error)", does this by-pass rule set by for
> > > >> > block.on.buffer.full or
> > > >> > > > > > > batch.size
> > > >> > > > > > > when under load.
> > > >> > > > > > >
> > > >> > > > > > > That was my intention, and I am sorry I mixed-up close()
> > > >>method
> > > >> > here
> > > >> > > > > > > without knowing that this is only for bulk send.
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > >
> > > >> > > > > > > Bhavesh
> > > >> > > > > > >
> > > >> > > > > > > On Mon, Feb 9, 2015 at 8:17 PM, Jay Kreps
> > > >><jay.kr...@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Yeah I second the problem Guozhang flags with giving
> > > >>flush a
> > > >> > > > timeout.
> > > >> > > > > > In
> > > >> > > > > > > > general failover in Kafka is a bounded thing unless you
> > > >>have
> > > >> > > > brought
> > > >> > > > > > your
> > > >> > > > > > > > Kafka cluster down entirely so I think depending on that
> > > >> bound
> > > >> > > > > > implicitly
> > > >> > > > > > > > is okay.
> > > >> > > > > > > >
> > > >> > > > > > > > It is possible to make flush() be instead
> > > >> > > > > > > >   boolean tryFlush(long timeout, TimeUnit unit);
> > > >> > > > > > > >
> > > >> > > > > > > > But I am somewhat skeptical that people will use this
> > > >> > correctly.
> > > >> > > > I.e
> > > >> > > > > > > > consider the mirror maker code snippet I gave above, how
> > > >> would
> > > >> > one
> > > >> > > > > > > actually
> > > >> > > > > > > > recover in this case other than retrying (which the
> > client
> > > >> > already
> > > >> > > > does
> > > >> > > > > > > > automatically)? After all if you are okay losing data
> > then
> > > >> you
> > > >> > > > don't
> > > >> > > > > > need
> > > >> > > > > > > > to bother calling flush at all, you can just let the
> > > >>messages
> > > >> > be
> > > >> > > > sent
> > > >> > > > > > > > asynchronously.
> > > >> > > > > > > >
> > > >> > > > > > > > I think close() is actually different because you may
> > well
> > > >> > want to
> > > >> > > > > > > shutdown
> > > >> > > > > > > > immediately and just throw away unsent events.
> > > >> > > > > > > >
> > > >> > > > > > > > -Jay
> > > >> > > > > > > >
> > > >> > > > > > > > On Mon, Feb 9, 2015 at 2:44 PM, Guozhang Wang <
> > > >> > wangg...@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > The proposal looks good to me, will need some time to
> > > >> review
> > > >> > the
> > > >> > > > > > > > > implementation RB later.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Bhavesh, I am wondering how you will use a flush()
> > with
> > > >>a
> > > >> > timeout
> > > >> > > > > > since
> > > >> > > > > > > > > such a call does not actually provide any flushing
> > > >> > guarantees?
> > > >> > > > > > > > >
> > > >> > > > > > > > > As for close(), there is a separate JIRA for this:
> > > >> > > > > > > > >
> > > >> > > > > > > > > KAFKA-1660 <
> > > >> https://issues.apache.org/jira/browse/KAFKA-1660
> > > >> > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > Guozhang
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Mon, Feb 9, 2015 at 2:29 PM, Bhavesh Mistry <
> > > >> > > > > > > > mistry.p.bhav...@gmail.com
> > > >> > > > > > > > > >
> > > >> > > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Hi Jay,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > How about adding timeout for each method calls
> > > >> > > > > > > flush(timeout,TimeUnit)
> > > >> > > > > > > > > and
> > > >> > > > > > > > > > close(timeout,TimeUNIT) ?  We had runway io thread
> > > >>issue
> > > >> > and
> > > >> > > > caller
> > > >> > > > > > > > > thread
> > > >> > > > > > > > > > should not blocked for ever for these methods ?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Bhavesh
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Sun, Feb 8, 2015 at 12:18 PM, Jay Kreps <
> > > >> > > > jay.kr...@gmail.com>
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Well actually in the case of linger.ms = 0 the
> > send
> > > >>is
> > > >> > still
> > > >> > > > > > > > > > asynchronous
> > > >> > > > > > > > > > > so calling flush() blocks until all the previously
> > > >>sent
> > > >> > > > records
> > > >> > > > > > > have
> > > >> > > > > > > > > > > completed. It doesn't speed anything up in that
> > > >>case,
> > > >> > though,
> > > >> > > > > > since
> > > >> > > > > > > > > they
> > > >> > > > > > > > > > > are already available to send.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > -Jay
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira <
> > > >> > > > > > > gshap...@cloudera.com
> > > >> > > > > > > > >
> > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > Looks good to me.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > I like the idea of not blocking additional sends
> > > >>but
> > > >> > not
> > > >> > > > > > > > guaranteeing
> > > >> > > > > > > > > > > that
> > > >> > > > > > > > > > > > flush() will deliver them.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > I assume that with linger.ms = 0, flush will
> > just
> > > >> be a
> > > >> > > > noop
> > > >> > > > > > > (since
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > > queue will be empty). Is that correct?
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Gwen
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps <
> > > >> > > > > > jay.kr...@gmail.com>
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Following up on our previous thread on making
> > > >>batch
> > > >> > send
> > > >> > > > a
> > > >> > > > > > > little
> > > >> > > > > > > > > > > easier,
> > > >> > > > > > > > > > > > > here is a concrete proposal to add a flush()
> > > >>method
> > > >> > to
> > > >> > > > the
> > > >> > > > > > > > > producer:
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > >
> > > >> >
> > > >>
> > > >>
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met
> > > >>hod+to+the+producer+API
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > A proposed implementation is here:
> > > >> > > > > > > > > > > > >
> > > https://issues.apache.org/jira/browse/KAFKA-1865
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > Thoughts?
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > > > -Jay
> > > >> > > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > --
> > > >> > > > > > > > > -- Guozhang
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > >
> > > >> > > >
> > > >> >
> > > >> >
> > > >>
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >

Reply via email to