Sounds good.

Yes, I'd want a guarantee that every future I get will always return the
recordmeta or an exception eventually.

Running into a similar issue with futures never returning with a pretty
straightforward case:
- Healthy producer/server setup
- Stop the server
- Send a message
- Call get on the future and it never returns. Doesn't matter if the server
is started again or remains stopped

Stepping into the producer code, it appears that in Sender.run a response
never comes back for the send while things are down and handleDisconnect is
never called.

These are basically the same tests I am running against 0.8.1.1 producer,
but I could have some wires crossed, so would be curious if others see
similar.

Thanks,
Paul


On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps <j...@confluent.io> wrote:

> Yeah if you want to file and JIRA and post a patch for a new option its
> possible others would want it. Maybe something like
>   pre.initialize.topics=x,y,z
>   pre.initialize.timeout=x
>
> The metadata fetch timeout is a bug...that behavior is inherited from
> Object.wait which defines zero to mean infinite but I think that is not
> very intuitive. If you file a ticket on that we could just fix it. I think
> being able to set 0 is actually useful for this case you are trying for.
>
> WRT to stopping the producer I think what you are saying is that you want
> it to be the case that calling close() on the producer immediately fails
> all outstanding requests with some exception, right?
>
> -Jay
>
> On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy <ppea...@gmail.com> wrote:
> >
> > Hi Jay,
> >   I have implemented a wrapper around the producer to behave like I want
> it
> > to. Where it diverges from current 0.8.2 producer is that it accepts
> three
> > new inputs:
> > - A list of expected topics
> > - A timeout value to init meta for those topics during producer creationg
> > - An option to blow up if we fail to init topic meta within some amount
> of
> > time
> >
> > I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will
> > block
> > forever and kick off a thread to do the topic meta data init in the
> > background.
> >
> > On the send side, things do fail fast, now. Only current hiccup(not
> > completely done re-working my tests, though) I am hitting now is that
> > messages accepted by the producer after the server have been stopped
> never
> > return a status if the producer is stopped, think this is a bug.
> >
> > Are you sure you wouldn't want any of this behavior in client by default
> > which would give out of the box choices to be made on blocking behavior?
> > Happy to share code or send a PR.
> >
> > Thanks,
> > Paul
> >
> > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Hey Paul,
> > >
> > > I agree we should document this better.
> > >
> > > We allow and encourage using partitions to semantically distribute
> data.
> > So
> > > unfortunately we can't just arbitrarily assign a partition (say 0) as
> > that
> > > would actually give incorrect answers for any consumer that made use of
> > the
> > > partitioning. It is true that the user can change the partitioning, but
> > we
> > > can't ignore the partitioning they have set.
> > >
> > > I get the use case you have--you basically want a hard guarantee that
> > > send() will never block (so presumably you have set to also drop data
> if
> > > the buffer fills up). As I said the blocking only occurs on the first
> > > request for a given topic and you can avoid it by pre-initializing the
> > > topic metadata.
> > >
> > > I think the option you describe is actually possible now. Basically you
> > can
> > > initialize the metadata for topics you care about using that
> > > partitionsFor() call. If you set the property
> metadata.fetch.timeout.ms
> > =0
> > > then any send calls prior to the completion of metadata initialization
> > will
> > > fail immediately rather than block.
> > >
> > > -Jay
> > >
> > >
> > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com>
> wrote:
> > > >
> > > > Hi Jay,
> > > >   Many thanks for the info. All that makes sense, but from an API
> > > > standpoint when something is labelled async and returns a Future,
> this
> > > will
> > > > be misconstrued and developers will place async sends in critical
> > client
> > > > facing request/response pathways of code that should never block. If
> > the
> > > > app comes up with a bad config, it will hang all incoming
> connections.
> > > >
> > > > Obviously, there is a spectrum of use cases with regard to message
> loss
> > > and
> > > > the defaults cannot cater to all use cases. I like that the defaults
> > tend
> > > > towards best effort guarantees, but I am not sure it justifies the
> > > > inconsistency in the API.
> > > >
> > > > 1) It sounds like the client is already structured to handle changes
> in
> > > > partitions on the fly, I am sure I am over simplifying but in the
> case
> > > > where no meta is available, but my naive approach would be assume
> some
> > > > number of partitions and then when there is metadata treat it as a
> > > > partition change event. If there are more unknown than just partition
> > > > count, probably won't work.
> > > > 2) Pretty much makes sense, especially now that I see people on this
> > > > discussion list wanting a million topics (good luck)
> > > > 3) I agree client creation shouldn't fail, but any sends should
> > probably
> > > > fast fail or have it explicit on the call the choice you are making.
> > > >
> > > > I'm still thinking about how I am going to make the client behave as
> > I'd
> > > > like. I think I need a background process kicked off on startup to
> > prime
> > > > the topics I am interested in. Until that process completes, any
> sends
> > > > through the producer will need to fast fail instead of hang. This
> would
> > > > still leave the window for blocking if you send to a topic your app
> > > wasn't
> > > > aware it would send to, but now we're getting into corner corner
> cases.
> > > >
> > > > Would having something like that be a baked in option be accepted
> into
> > > > Kafka clients mainline?
> > > >
> > > > A quick win might be to clarify the documentation so that it is clear
> > > that
> > > > this API will block in cases XYZ (maybe this is mentioned somewhere
> > and I
> > > > missed it).
> > > >
> > > > Thanks,
> > > > Paul
> > > >
> > > >
> > > > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote:
> > > > >
> > > > > Hey Paul,
> > > > >
> > > > > Here are the constraints:
> > > > > 1. We wanted the storage of messages to be in their compact binary
> > form
> > > > so
> > > > > we could bound memory usage. This implies partitioning prior to
> > > enqueue.
> > > > > And as you note partitioning requires having metadata (even stale
> > > > metadata)
> > > > > about topics.
> > > > > 2. We wanted to avoid prefetching metadata for all topics since
> there
> > > may
> > > > > be quite a lot of topics.
> > > > > 3. We wanted to make metadata fetching lazy so that it would be
> > > possible
> > > > to
> > > > > create a client without having an active network connection. This
> > tends
> > > > to
> > > > > be important when services are brought up in development or test
> > > > > environments where it is annoying to have to control the dependency
> > > graph
> > > > > when starting things.
> > > > >
> > > > > This blocking isn't too bad as it only occurs on the first request
> > for
> > > > each
> > > > > topic. Our feeling was that many things tend to get setup on a
> first
> > > > > request (DB connections are established, caches populated, etc) so
> > this
> > > > was
> > > > > not unreasonable.
> > > > >
> > > > > If you want to pre-initialize the metadata to avoid blocking on the
> > > first
> > > > > request you can do so by fetching the metadata using the
> > > > > producer.partitionsFor(topic) api at start-up.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > >   Playing around with the 0.8.2-beta producer client. One of my
> > test
> > > > > cases
> > > > > > is to ensure producers can deal with Kafka being down when the
> > > producer
> > > > > is
> > > > > > created. My tests failed miserably because of the default
> blocking
> > in
> > > > the
> > > > > > producer with regard to metadata.fetch.timeout.ms. The first
> line
> > of
> > > > new
> > > > > > producer is waitOnMetadata which is blocking.
> > > > > >
> > > > > > I can handle this case by loading topic meta on init and setting
> > the
> > > > > > timeout value to very low metadata.fetch.timeout.ms and either
> > > > throwing
> > > > > > away messages or creating my own internal queue to buffer.
> > > > > >
> > > > > > I’m surprised the metasync isn’t done async. If it fails, return
> > that
> > > > in
> > > > > > the future/callback. This way the API could actually be
> considered
> > > > safely
> > > > > > async and the producer buffer could try to hold on to things
> until
> > > > > > block.on.buffer.full kicks in to either drop messages or block.
> > You’d
> > > > > > probably need a partition callback since numPartitions wouldn’t
> be
> > > > > > available.
> > > > > >
> > > > > > The implication is that people's apps will work fine if first
> > > messages
> > > > > are
> > > > > > sent while kafka server is up, however, if kafka is down and they
> > > > restart
> > > > > > their app, the new producer will block all sends and blow things
> up
> > > if
> > > > > you
> > > > > > haven't written your app to be aware of this edge case.
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Paul
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to