FYI, I bumped server to 0.8.2-beta and I don't hit the basic failure I
mentioned above, which is great.

I haven't been able to find confirmation in the docs, but from past
conversation(
http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3c20140829174552.ga30...@jkoshy-ld.linkedin.biz%3E),
it seems that 0.8.2 producer should be fully compatible with 0.8.1.1
broker.

>From everything I see running in a single node config, the 0.8.2 Java
producer is effectively dead after a complete disconnect to the 0.8.1.1
broker.

Thanks,
Paul

On Sun, Dec 21, 2014 at 3:06 AM, Paul Pearcy <ppea...@gmail.com> wrote:

> Sounds good.
>
> Yes, I'd want a guarantee that every future I get will always return the
> recordmeta or an exception eventually.
>
> Running into a similar issue with futures never returning with a pretty
> straightforward case:
> - Healthy producer/server setup
> - Stop the server
> - Send a message
> - Call get on the future and it never returns. Doesn't matter if the
> server is started again or remains stopped
>
> Stepping into the producer code, it appears that in Sender.run a response
> never comes back for the send while things are down and handleDisconnect is
> never called.
>
> These are basically the same tests I am running against 0.8.1.1 producer,
> but I could have some wires crossed, so would be curious if others see
> similar.
>
> Thanks,
> Paul
>
>
> On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps <j...@confluent.io> wrote:
>
>> Yeah if you want to file and JIRA and post a patch for a new option its
>> possible others would want it. Maybe something like
>>   pre.initialize.topics=x,y,z
>>   pre.initialize.timeout=x
>>
>> The metadata fetch timeout is a bug...that behavior is inherited from
>> Object.wait which defines zero to mean infinite but I think that is not
>> very intuitive. If you file a ticket on that we could just fix it. I think
>> being able to set 0 is actually useful for this case you are trying for.
>>
>> WRT to stopping the producer I think what you are saying is that you want
>> it to be the case that calling close() on the producer immediately fails
>> all outstanding requests with some exception, right?
>>
>> -Jay
>>
>> On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy <ppea...@gmail.com> wrote:
>> >
>> > Hi Jay,
>> >   I have implemented a wrapper around the producer to behave like I
>> want it
>> > to. Where it diverges from current 0.8.2 producer is that it accepts
>> three
>> > new inputs:
>> > - A list of expected topics
>> > - A timeout value to init meta for those topics during producer
>> creationg
>> > - An option to blow up if we fail to init topic meta within some amount
>> of
>> > time
>> >
>> > I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will
>> > block
>> > forever and kick off a thread to do the topic meta data init in the
>> > background.
>> >
>> > On the send side, things do fail fast, now. Only current hiccup(not
>> > completely done re-working my tests, though) I am hitting now is that
>> > messages accepted by the producer after the server have been stopped
>> never
>> > return a status if the producer is stopped, think this is a bug.
>> >
>> > Are you sure you wouldn't want any of this behavior in client by default
>> > which would give out of the box choices to be made on blocking behavior?
>> > Happy to share code or send a PR.
>> >
>> > Thanks,
>> > Paul
>> >
>> > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote:
>> >
>> > > Hey Paul,
>> > >
>> > > I agree we should document this better.
>> > >
>> > > We allow and encourage using partitions to semantically distribute
>> data.
>> > So
>> > > unfortunately we can't just arbitrarily assign a partition (say 0) as
>> > that
>> > > would actually give incorrect answers for any consumer that made use
>> of
>> > the
>> > > partitioning. It is true that the user can change the partitioning,
>> but
>> > we
>> > > can't ignore the partitioning they have set.
>> > >
>> > > I get the use case you have--you basically want a hard guarantee that
>> > > send() will never block (so presumably you have set to also drop data
>> if
>> > > the buffer fills up). As I said the blocking only occurs on the first
>> > > request for a given topic and you can avoid it by pre-initializing the
>> > > topic metadata.
>> > >
>> > > I think the option you describe is actually possible now. Basically
>> you
>> > can
>> > > initialize the metadata for topics you care about using that
>> > > partitionsFor() call. If you set the property
>> metadata.fetch.timeout.ms
>> > =0
>> > > then any send calls prior to the completion of metadata initialization
>> > will
>> > > fail immediately rather than block.
>> > >
>> > > -Jay
>> > >
>> > >
>> > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com>
>> wrote:
>> > > >
>> > > > Hi Jay,
>> > > >   Many thanks for the info. All that makes sense, but from an API
>> > > > standpoint when something is labelled async and returns a Future,
>> this
>> > > will
>> > > > be misconstrued and developers will place async sends in critical
>> > client
>> > > > facing request/response pathways of code that should never block. If
>> > the
>> > > > app comes up with a bad config, it will hang all incoming
>> connections.
>> > > >
>> > > > Obviously, there is a spectrum of use cases with regard to message
>> loss
>> > > and
>> > > > the defaults cannot cater to all use cases. I like that the defaults
>> > tend
>> > > > towards best effort guarantees, but I am not sure it justifies the
>> > > > inconsistency in the API.
>> > > >
>> > > > 1) It sounds like the client is already structured to handle
>> changes in
>> > > > partitions on the fly, I am sure I am over simplifying but in the
>> case
>> > > > where no meta is available, but my naive approach would be assume
>> some
>> > > > number of partitions and then when there is metadata treat it as a
>> > > > partition change event. If there are more unknown than just
>> partition
>> > > > count, probably won't work.
>> > > > 2) Pretty much makes sense, especially now that I see people on this
>> > > > discussion list wanting a million topics (good luck)
>> > > > 3) I agree client creation shouldn't fail, but any sends should
>> > probably
>> > > > fast fail or have it explicit on the call the choice you are making.
>> > > >
>> > > > I'm still thinking about how I am going to make the client behave as
>> > I'd
>> > > > like. I think I need a background process kicked off on startup to
>> > prime
>> > > > the topics I am interested in. Until that process completes, any
>> sends
>> > > > through the producer will need to fast fail instead of hang. This
>> would
>> > > > still leave the window for blocking if you send to a topic your app
>> > > wasn't
>> > > > aware it would send to, but now we're getting into corner corner
>> cases.
>> > > >
>> > > > Would having something like that be a baked in option be accepted
>> into
>> > > > Kafka clients mainline?
>> > > >
>> > > > A quick win might be to clarify the documentation so that it is
>> clear
>> > > that
>> > > > this API will block in cases XYZ (maybe this is mentioned somewhere
>> > and I
>> > > > missed it).
>> > > >
>> > > > Thanks,
>> > > > Paul
>> > > >
>> > > >
>> > > > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io>
>> wrote:
>> > > > >
>> > > > > Hey Paul,
>> > > > >
>> > > > > Here are the constraints:
>> > > > > 1. We wanted the storage of messages to be in their compact binary
>> > form
>> > > > so
>> > > > > we could bound memory usage. This implies partitioning prior to
>> > > enqueue.
>> > > > > And as you note partitioning requires having metadata (even stale
>> > > > metadata)
>> > > > > about topics.
>> > > > > 2. We wanted to avoid prefetching metadata for all topics since
>> there
>> > > may
>> > > > > be quite a lot of topics.
>> > > > > 3. We wanted to make metadata fetching lazy so that it would be
>> > > possible
>> > > > to
>> > > > > create a client without having an active network connection. This
>> > tends
>> > > > to
>> > > > > be important when services are brought up in development or test
>> > > > > environments where it is annoying to have to control the
>> dependency
>> > > graph
>> > > > > when starting things.
>> > > > >
>> > > > > This blocking isn't too bad as it only occurs on the first request
>> > for
>> > > > each
>> > > > > topic. Our feeling was that many things tend to get setup on a
>> first
>> > > > > request (DB connections are established, caches populated, etc) so
>> > this
>> > > > was
>> > > > > not unreasonable.
>> > > > >
>> > > > > If you want to pre-initialize the metadata to avoid blocking on
>> the
>> > > first
>> > > > > request you can do so by fetching the metadata using the
>> > > > > producer.partitionsFor(topic) api at start-up.
>> > > > >
>> > > > > -Jay
>> > > > >
>> > > > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > Hello,
>> > > > > >
>> > > > > >   Playing around with the 0.8.2-beta producer client. One of my
>> > test
>> > > > > cases
>> > > > > > is to ensure producers can deal with Kafka being down when the
>> > > producer
>> > > > > is
>> > > > > > created. My tests failed miserably because of the default
>> blocking
>> > in
>> > > > the
>> > > > > > producer with regard to metadata.fetch.timeout.ms. The first
>> line
>> > of
>> > > > new
>> > > > > > producer is waitOnMetadata which is blocking.
>> > > > > >
>> > > > > > I can handle this case by loading topic meta on init and setting
>> > the
>> > > > > > timeout value to very low metadata.fetch.timeout.ms and either
>> > > > throwing
>> > > > > > away messages or creating my own internal queue to buffer.
>> > > > > >
>> > > > > > I’m surprised the metasync isn’t done async. If it fails, return
>> > that
>> > > > in
>> > > > > > the future/callback. This way the API could actually be
>> considered
>> > > > safely
>> > > > > > async and the producer buffer could try to hold on to things
>> until
>> > > > > > block.on.buffer.full kicks in to either drop messages or block.
>> > You’d
>> > > > > > probably need a partition callback since numPartitions wouldn’t
>> be
>> > > > > > available.
>> > > > > >
>> > > > > > The implication is that people's apps will work fine if first
>> > > messages
>> > > > > are
>> > > > > > sent while kafka server is up, however, if kafka is down and
>> they
>> > > > restart
>> > > > > > their app, the new producer will block all sends and blow
>> things up
>> > > if
>> > > > > you
>> > > > > > haven't written your app to be aware of this edge case.
>> > > > > >
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Paul
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to