Hi Jay,
  I have implemented a wrapper around the producer to behave like I want it
to. Where it diverges from current 0.8.2 producer is that it accepts three
new inputs:
- A list of expected topics
- A timeout value to init meta for those topics during producer creationg
- An option to blow up if we fail to init topic meta within some amount of
time

I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will block
forever and kick off a thread to do the topic meta data init in the
background.

On the send side, things do fail fast, now. Only current hiccup(not
completely done re-working my tests, though) I am hitting now is that
messages accepted by the producer after the server have been stopped never
return a status if the producer is stopped, think this is a bug.

Are you sure you wouldn't want any of this behavior in client by default
which would give out of the box choices to be made on blocking behavior?
Happy to share code or send a PR.

Thanks,
Paul

On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote:

> Hey Paul,
>
> I agree we should document this better.
>
> We allow and encourage using partitions to semantically distribute data. So
> unfortunately we can't just arbitrarily assign a partition (say 0) as that
> would actually give incorrect answers for any consumer that made use of the
> partitioning. It is true that the user can change the partitioning, but we
> can't ignore the partitioning they have set.
>
> I get the use case you have--you basically want a hard guarantee that
> send() will never block (so presumably you have set to also drop data if
> the buffer fills up). As I said the blocking only occurs on the first
> request for a given topic and you can avoid it by pre-initializing the
> topic metadata.
>
> I think the option you describe is actually possible now. Basically you can
> initialize the metadata for topics you care about using that
> partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0
> then any send calls prior to the completion of metadata initialization will
> fail immediately rather than block.
>
> -Jay
>
>
> On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com> wrote:
> >
> > Hi Jay,
> >   Many thanks for the info. All that makes sense, but from an API
> > standpoint when something is labelled async and returns a Future, this
> will
> > be misconstrued and developers will place async sends in critical client
> > facing request/response pathways of code that should never block. If the
> > app comes up with a bad config, it will hang all incoming connections.
> >
> > Obviously, there is a spectrum of use cases with regard to message loss
> and
> > the defaults cannot cater to all use cases. I like that the defaults tend
> > towards best effort guarantees, but I am not sure it justifies the
> > inconsistency in the API.
> >
> > 1) It sounds like the client is already structured to handle changes in
> > partitions on the fly, I am sure I am over simplifying but in the case
> > where no meta is available, but my naive approach would be assume some
> > number of partitions and then when there is metadata treat it as a
> > partition change event. If there are more unknown than just partition
> > count, probably won't work.
> > 2) Pretty much makes sense, especially now that I see people on this
> > discussion list wanting a million topics (good luck)
> > 3) I agree client creation shouldn't fail, but any sends should probably
> > fast fail or have it explicit on the call the choice you are making.
> >
> > I'm still thinking about how I am going to make the client behave as I'd
> > like. I think I need a background process kicked off on startup to prime
> > the topics I am interested in. Until that process completes, any sends
> > through the producer will need to fast fail instead of hang. This would
> > still leave the window for blocking if you send to a topic your app
> wasn't
> > aware it would send to, but now we're getting into corner corner cases.
> >
> > Would having something like that be a baked in option be accepted into
> > Kafka clients mainline?
> >
> > A quick win might be to clarify the documentation so that it is clear
> that
> > this API will block in cases XYZ (maybe this is mentioned somewhere and I
> > missed it).
> >
> > Thanks,
> > Paul
> >
> >
> > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > Hey Paul,
> > >
> > > Here are the constraints:
> > > 1. We wanted the storage of messages to be in their compact binary form
> > so
> > > we could bound memory usage. This implies partitioning prior to
> enqueue.
> > > And as you note partitioning requires having metadata (even stale
> > metadata)
> > > about topics.
> > > 2. We wanted to avoid prefetching metadata for all topics since there
> may
> > > be quite a lot of topics.
> > > 3. We wanted to make metadata fetching lazy so that it would be
> possible
> > to
> > > create a client without having an active network connection. This tends
> > to
> > > be important when services are brought up in development or test
> > > environments where it is annoying to have to control the dependency
> graph
> > > when starting things.
> > >
> > > This blocking isn't too bad as it only occurs on the first request for
> > each
> > > topic. Our feeling was that many things tend to get setup on a first
> > > request (DB connections are established, caches populated, etc) so this
> > was
> > > not unreasonable.
> > >
> > > If you want to pre-initialize the metadata to avoid blocking on the
> first
> > > request you can do so by fetching the metadata using the
> > > producer.partitionsFor(topic) api at start-up.
> > >
> > > -Jay
> > >
> > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com>
> wrote:
> > > >
> > > > Hello,
> > > >
> > > >   Playing around with the 0.8.2-beta producer client. One of my test
> > > cases
> > > > is to ensure producers can deal with Kafka being down when the
> producer
> > > is
> > > > created. My tests failed miserably because of the default blocking in
> > the
> > > > producer with regard to metadata.fetch.timeout.ms. The first line of
> > new
> > > > producer is waitOnMetadata which is blocking.
> > > >
> > > > I can handle this case by loading topic meta on init and setting the
> > > > timeout value to very low metadata.fetch.timeout.ms and either
> > throwing
> > > > away messages or creating my own internal queue to buffer.
> > > >
> > > > I’m surprised the metasync isn’t done async. If it fails, return that
> > in
> > > > the future/callback. This way the API could actually be considered
> > safely
> > > > async and the producer buffer could try to hold on to things until
> > > > block.on.buffer.full kicks in to either drop messages or block. You’d
> > > > probably need a partition callback since numPartitions wouldn’t be
> > > > available.
> > > >
> > > > The implication is that people's apps will work fine if first
> messages
> > > are
> > > > sent while kafka server is up, however, if kafka is down and they
> > restart
> > > > their app, the new producer will block all sends and blow things up
> if
> > > you
> > > > haven't written your app to be aware of this edge case.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Paul
> > > >
> > >
> >
>

Reply via email to