Hey Paul,

I agree we should document this better.

We allow and encourage using partitions to semantically distribute data. So
unfortunately we can't just arbitrarily assign a partition (say 0) as that
would actually give incorrect answers for any consumer that made use of the
partitioning. It is true that the user can change the partitioning, but we
can't ignore the partitioning they have set.

I get the use case you have--you basically want a hard guarantee that
send() will never block (so presumably you have set to also drop data if
the buffer fills up). As I said the blocking only occurs on the first
request for a given topic and you can avoid it by pre-initializing the
topic metadata.

I think the option you describe is actually possible now. Basically you can
initialize the metadata for topics you care about using that
partitionsFor() call. If you set the property metadata.fetch.timeout.ms=0
then any send calls prior to the completion of metadata initialization will
fail immediately rather than block.

-Jay


On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com> wrote:
>
> Hi Jay,
>   Many thanks for the info. All that makes sense, but from an API
> standpoint when something is labelled async and returns a Future, this will
> be misconstrued and developers will place async sends in critical client
> facing request/response pathways of code that should never block. If the
> app comes up with a bad config, it will hang all incoming connections.
>
> Obviously, there is a spectrum of use cases with regard to message loss and
> the defaults cannot cater to all use cases. I like that the defaults tend
> towards best effort guarantees, but I am not sure it justifies the
> inconsistency in the API.
>
> 1) It sounds like the client is already structured to handle changes in
> partitions on the fly, I am sure I am over simplifying but in the case
> where no meta is available, but my naive approach would be assume some
> number of partitions and then when there is metadata treat it as a
> partition change event. If there are more unknown than just partition
> count, probably won't work.
> 2) Pretty much makes sense, especially now that I see people on this
> discussion list wanting a million topics (good luck)
> 3) I agree client creation shouldn't fail, but any sends should probably
> fast fail or have it explicit on the call the choice you are making.
>
> I'm still thinking about how I am going to make the client behave as I'd
> like. I think I need a background process kicked off on startup to prime
> the topics I am interested in. Until that process completes, any sends
> through the producer will need to fast fail instead of hang. This would
> still leave the window for blocking if you send to a topic your app wasn't
> aware it would send to, but now we're getting into corner corner cases.
>
> Would having something like that be a baked in option be accepted into
> Kafka clients mainline?
>
> A quick win might be to clarify the documentation so that it is clear that
> this API will block in cases XYZ (maybe this is mentioned somewhere and I
> missed it).
>
> Thanks,
> Paul
>
>
> On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > Hey Paul,
> >
> > Here are the constraints:
> > 1. We wanted the storage of messages to be in their compact binary form
> so
> > we could bound memory usage. This implies partitioning prior to enqueue.
> > And as you note partitioning requires having metadata (even stale
> metadata)
> > about topics.
> > 2. We wanted to avoid prefetching metadata for all topics since there may
> > be quite a lot of topics.
> > 3. We wanted to make metadata fetching lazy so that it would be possible
> to
> > create a client without having an active network connection. This tends
> to
> > be important when services are brought up in development or test
> > environments where it is annoying to have to control the dependency graph
> > when starting things.
> >
> > This blocking isn't too bad as it only occurs on the first request for
> each
> > topic. Our feeling was that many things tend to get setup on a first
> > request (DB connections are established, caches populated, etc) so this
> was
> > not unreasonable.
> >
> > If you want to pre-initialize the metadata to avoid blocking on the first
> > request you can do so by fetching the metadata using the
> > producer.partitionsFor(topic) api at start-up.
> >
> > -Jay
> >
> > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com> wrote:
> > >
> > > Hello,
> > >
> > >   Playing around with the 0.8.2-beta producer client. One of my test
> > cases
> > > is to ensure producers can deal with Kafka being down when the producer
> > is
> > > created. My tests failed miserably because of the default blocking in
> the
> > > producer with regard to metadata.fetch.timeout.ms. The first line of
> new
> > > producer is waitOnMetadata which is blocking.
> > >
> > > I can handle this case by loading topic meta on init and setting the
> > > timeout value to very low metadata.fetch.timeout.ms and either
> throwing
> > > away messages or creating my own internal queue to buffer.
> > >
> > > I’m surprised the metasync isn’t done async. If it fails, return that
> in
> > > the future/callback. This way the API could actually be considered
> safely
> > > async and the producer buffer could try to hold on to things until
> > > block.on.buffer.full kicks in to either drop messages or block. You’d
> > > probably need a partition callback since numPartitions wouldn’t be
> > > available.
> > >
> > > The implication is that people's apps will work fine if first messages
> > are
> > > sent while kafka server is up, however, if kafka is down and they
> restart
> > > their app, the new producer will block all sends and blow things up if
> > you
> > > haven't written your app to be aware of this edge case.
> > >
> > >
> > > Thanks,
> > >
> > > Paul
> > >
> >
>

Reply via email to