Sounds good. Yes, I'd want a guarantee that every future I get will always return the recordmeta or an exception eventually.
Running into a similar issue with futures never returning with a pretty straightforward case: - Healthy producer/server setup - Stop the server - Send a message - Call get on the future and it never returns. Doesn't matter if the server is started again or remains stopped Stepping into the producer code, it appears that in Sender.run a response never comes back for the send while things are down and handleDisconnect is never called. These are basically the same tests I am running against 0.8.1.1 producer, but I could have some wires crossed, so would be curious if others see similar. Thanks, Paul On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps <j...@confluent.io> wrote: > Yeah if you want to file and JIRA and post a patch for a new option its > possible others would want it. Maybe something like > pre.initialize.topics=x,y,z > pre.initialize.timeout=x > > The metadata fetch timeout is a bug...that behavior is inherited from > Object.wait which defines zero to mean infinite but I think that is not > very intuitive. If you file a ticket on that we could just fix it. I think > being able to set 0 is actually useful for this case you are trying for. > > WRT to stopping the producer I think what you are saying is that you want > it to be the case that calling close() on the producer immediately fails > all outstanding requests with some exception, right? > > -Jay > > On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy <ppea...@gmail.com> wrote: > > > > Hi Jay, > > I have implemented a wrapper around the producer to behave like I want > it > > to. Where it diverges from current 0.8.2 producer is that it accepts > three > > new inputs: > > - A list of expected topics > > - A timeout value to init meta for those topics during producer creationg > > - An option to blow up if we fail to init topic meta within some amount > of > > time > > > > I also needed to set metadata.fetch.timeout.ms=1, as 0 means it will > > block > > forever and kick off a thread to do the topic meta data init in the > > background. > > > > On the send side, things do fail fast, now. Only current hiccup(not > > completely done re-working my tests, though) I am hitting now is that > > messages accepted by the producer after the server have been stopped > never > > return a status if the producer is stopped, think this is a bug. > > > > Are you sure you wouldn't want any of this behavior in client by default > > which would give out of the box choices to be made on blocking behavior? > > Happy to share code or send a PR. > > > > Thanks, > > Paul > > > > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Hey Paul, > > > > > > I agree we should document this better. > > > > > > We allow and encourage using partitions to semantically distribute > data. > > So > > > unfortunately we can't just arbitrarily assign a partition (say 0) as > > that > > > would actually give incorrect answers for any consumer that made use of > > the > > > partitioning. It is true that the user can change the partitioning, but > > we > > > can't ignore the partitioning they have set. > > > > > > I get the use case you have--you basically want a hard guarantee that > > > send() will never block (so presumably you have set to also drop data > if > > > the buffer fills up). As I said the blocking only occurs on the first > > > request for a given topic and you can avoid it by pre-initializing the > > > topic metadata. > > > > > > I think the option you describe is actually possible now. Basically you > > can > > > initialize the metadata for topics you care about using that > > > partitionsFor() call. If you set the property > metadata.fetch.timeout.ms > > =0 > > > then any send calls prior to the completion of metadata initialization > > will > > > fail immediately rather than block. > > > > > > -Jay > > > > > > > > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy <ppea...@gmail.com> > wrote: > > > > > > > > Hi Jay, > > > > Many thanks for the info. All that makes sense, but from an API > > > > standpoint when something is labelled async and returns a Future, > this > > > will > > > > be misconstrued and developers will place async sends in critical > > client > > > > facing request/response pathways of code that should never block. If > > the > > > > app comes up with a bad config, it will hang all incoming > connections. > > > > > > > > Obviously, there is a spectrum of use cases with regard to message > loss > > > and > > > > the defaults cannot cater to all use cases. I like that the defaults > > tend > > > > towards best effort guarantees, but I am not sure it justifies the > > > > inconsistency in the API. > > > > > > > > 1) It sounds like the client is already structured to handle changes > in > > > > partitions on the fly, I am sure I am over simplifying but in the > case > > > > where no meta is available, but my naive approach would be assume > some > > > > number of partitions and then when there is metadata treat it as a > > > > partition change event. If there are more unknown than just partition > > > > count, probably won't work. > > > > 2) Pretty much makes sense, especially now that I see people on this > > > > discussion list wanting a million topics (good luck) > > > > 3) I agree client creation shouldn't fail, but any sends should > > probably > > > > fast fail or have it explicit on the call the choice you are making. > > > > > > > > I'm still thinking about how I am going to make the client behave as > > I'd > > > > like. I think I need a background process kicked off on startup to > > prime > > > > the topics I am interested in. Until that process completes, any > sends > > > > through the producer will need to fast fail instead of hang. This > would > > > > still leave the window for blocking if you send to a topic your app > > > wasn't > > > > aware it would send to, but now we're getting into corner corner > cases. > > > > > > > > Would having something like that be a baked in option be accepted > into > > > > Kafka clients mainline? > > > > > > > > A quick win might be to clarify the documentation so that it is clear > > > that > > > > this API will block in cases XYZ (maybe this is mentioned somewhere > > and I > > > > missed it). > > > > > > > > Thanks, > > > > Paul > > > > > > > > > > > > On Thu, Dec 18, 2014 at 1:17 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > > > > Hey Paul, > > > > > > > > > > Here are the constraints: > > > > > 1. We wanted the storage of messages to be in their compact binary > > form > > > > so > > > > > we could bound memory usage. This implies partitioning prior to > > > enqueue. > > > > > And as you note partitioning requires having metadata (even stale > > > > metadata) > > > > > about topics. > > > > > 2. We wanted to avoid prefetching metadata for all topics since > there > > > may > > > > > be quite a lot of topics. > > > > > 3. We wanted to make metadata fetching lazy so that it would be > > > possible > > > > to > > > > > create a client without having an active network connection. This > > tends > > > > to > > > > > be important when services are brought up in development or test > > > > > environments where it is annoying to have to control the dependency > > > graph > > > > > when starting things. > > > > > > > > > > This blocking isn't too bad as it only occurs on the first request > > for > > > > each > > > > > topic. Our feeling was that many things tend to get setup on a > first > > > > > request (DB connections are established, caches populated, etc) so > > this > > > > was > > > > > not unreasonable. > > > > > > > > > > If you want to pre-initialize the metadata to avoid blocking on the > > > first > > > > > request you can do so by fetching the metadata using the > > > > > producer.partitionsFor(topic) api at start-up. > > > > > > > > > > -Jay > > > > > > > > > > On Thu, Dec 18, 2014 at 9:07 AM, Paul Pearcy <ppea...@gmail.com> > > > wrote: > > > > > > > > > > > > Hello, > > > > > > > > > > > > Playing around with the 0.8.2-beta producer client. One of my > > test > > > > > cases > > > > > > is to ensure producers can deal with Kafka being down when the > > > producer > > > > > is > > > > > > created. My tests failed miserably because of the default > blocking > > in > > > > the > > > > > > producer with regard to metadata.fetch.timeout.ms. The first > line > > of > > > > new > > > > > > producer is waitOnMetadata which is blocking. > > > > > > > > > > > > I can handle this case by loading topic meta on init and setting > > the > > > > > > timeout value to very low metadata.fetch.timeout.ms and either > > > > throwing > > > > > > away messages or creating my own internal queue to buffer. > > > > > > > > > > > > I’m surprised the metasync isn’t done async. If it fails, return > > that > > > > in > > > > > > the future/callback. This way the API could actually be > considered > > > > safely > > > > > > async and the producer buffer could try to hold on to things > until > > > > > > block.on.buffer.full kicks in to either drop messages or block. > > You’d > > > > > > probably need a partition callback since numPartitions wouldn’t > be > > > > > > available. > > > > > > > > > > > > The implication is that people's apps will work fine if first > > > messages > > > > > are > > > > > > sent while kafka server is up, however, if kafka is down and they > > > > restart > > > > > > their app, the new producer will block all sends and blow things > up > > > if > > > > > you > > > > > > haven't written your app to be aware of this edge case. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Paul > > > > > > > > > > > > > > > > > > > > >