[jira] [Updated] (KAFKA-2551) Unclean leader election docs outdated
[ https://issues.apache.org/jira/browse/KAFKA-2551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2551: --- Assignee: Manikumar Reddy (was: jin xing) Fix Version/s: 0.10.0.0 GitHub user omkreddy opened a pull request: https://github.com/apache/kafka/pull/1054 > Unclean leader election docs outdated > - > > Key: KAFKA-2551 > URL: https://issues.apache.org/jira/browse/KAFKA-2551 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.8.2.2 >Reporter: Stevo Slavic >Assignee: Manikumar Reddy >Priority: Trivial > Labels: documentation, newbie > Fix For: 0.10.0.0 > > > Current unclean leader election docs state: > {quote} > In the future, we would like to make this configurable to better support use > cases where downtime is preferable to inconsistency. > {quote} > Since 0.8.2.0, unclean leader election strategy (whether to allow it or not) > is already configurable via {{unclean.leader.election.enable}} broker config > property. > That sentence is in both > https://svn.apache.org/repos/asf/kafka/site/083/design.html and > https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end of > "Unclean leader election: What if they all die?" section. Next section, > "Availability and Durability Guarantees", mentions ability to disable unclean > leader election, so likely just this one reference needs to be updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.
Jiangjie Qin created KAFKA-3388: --- Summary: Producer should only timeout a batch in the accumulator when metadata is missing. Key: KAFKA-3388 URL: https://issues.apache.org/jira/browse/KAFKA-3388 Project: Kafka Issue Type: Bug Affects Versions: 0.9.0.1 Reporter: Jiangjie Qin Priority: Blocker Fix For: 0.10.0.0 In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the accumulator. We were intended to avoid the case that the batches sitting in the accumulator forever when topic metadata is missing. Currently we are not checking if metadata is available or not when we timeout the batches in the accumulator (although the comments says we will check the metadata). This causes problem that once the previous batch hit a request timeout and got retried, all the subsequent batches will fail with timeout exception. We should only timeout the batches in the accumulator when the metadata of the partition is missing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2359) New consumer - partitions auto assigned only on poll
[ https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192034#comment-15192034 ] Ken Gidley commented on KAFKA-2359: --- How about this use case: I want to reprocess all the data in my log stream on application startup, using the same consumer group.id. With 0.9.0.0, I call 'subscribe', then try to call 'seekToBeginning' and it fails with "Caused by: java.lang.IllegalStateException: No current assignment for partition test1". It seems weird to have to call poll(), which will read some records, and then throw them away to re-read from the beginning of the topic. > New consumer - partitions auto assigned only on poll > > > Key: KAFKA-2359 > URL: https://issues.apache.org/jira/browse/KAFKA-2359 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Stevo Slavic >Priority: Minor > > In the new consumer I encountered unexpected behavior. After constructing > {{KafkaConsumer}} instance with configured consumer rebalance callback > handler, and subscribing to a topic with "consumer.subscribe(topic)", > retrieving subscriptions would return empty set and callback handler would > not get called (no partitions ever assigned or revoked), no matter how long > instance was up. > Then I found by inspecting {{KafkaConsumer}} code that partition assignment > will only be triggered on first {{poll}}, since {{pollOnce}} has: > {noformat} > // ensure we have partitions assigned if we expect to > if (subscriptions.partitionsAutoAssigned()) > coordinator.ensurePartitionAssignment(); > {noformat} > I'm proposing to fix this by including same {{ensurePartitionAssignment}} > fragment in {{KafkaConsumer.subscriptions}} accessor as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
KStreams Proposal
Hi All, While working with KStream/KStreamImp I discovered that there does not seem to be any way to connect the results of the KStream.process method with a sink node. I'd like to propose an addition to the API a "processTo" method. I've looked at and used the "transform", "reduceByKey" and "aggregateByKey" methods, but "processTo" would work like a more general purpose collector terminating the KStream and allow for writing out results to an arbitrary topic (regardless of key type). I've done a quick prototype and some initial testing locally on my fork. If you think this could be useful I can add unit tests and create a PR. I've included the proposed code changes and the test driver code below KStream.java additions void processTo(String topic, ProcessorSupplierprocessorSupplier, String... stateStoreNames); void processTo(String topic, ProcessorSupplier processorSupplier, Serializer keySerializer, Serializer valSerializer, String... stateStoreNames); KStreamImpl.java additions @Override public void processTo(String topic, ProcessorSupplier processorSupplier, String... stateStoreNames) { processTo(topic, processorSupplier, null, null, stateStoreNames); } @SuppressWarnings("unchecked") @Override public void processTo(String topic,ProcessorSupplier processorSupplier, Serializer keySerializer, Serializer valSerializer, String... stateStoreNames) { String processorName = topology.newName(PROCESSOR_NAME); String sinkName = topology.newName(SINK_NAME); StreamPartitioner streamPartitioner = null; if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer windowedSerializer = (WindowedSerializer) keySerializer; streamPartitioner = (StreamPartitioner ) new WindowedStreamPartitioner
[GitHub] kafka pull request: KAFKA-2551: Update Unclean leader election doc...
GitHub user omkreddy opened a pull request: https://github.com/apache/kafka/pull/1054 KAFKA-2551: Update Unclean leader election docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/omkreddy/kafka KAFKA-2551 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1054.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1054 commit 8e0829c6e4128bfa719146579743746fee0cc4a0 Author: Manikumar reddy ODate: 2016-03-12T16:33:53Z KAFKA-2551; Update Unclean leader election docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] KIP-35 - Retrieve protocol version
Good summary! Some comments inline: 2016-03-11 22:59 GMT+01:00 Ashish Singh: > Sounds like we are mostly in agreement. Following are the key points. > >1. Every time a protocol version changes, for any request/response, >broker version, ApiVersion, will be bumped up. > Maybe clarify that the protocol version (=broker version) bump is implicit by a release (formal/final or interim). >2. Protocol documentation will be versioned with broker version. Every >time there is a broker version change, protocol documentation version > needs >to be updated and linked to main documentation page. >3. Deprecation of protocol version will be done via marking the version >as deprecated on the protocol documentation. >4. On getting unknown protocol version, broker will send an empty >response, instead of simply closing client connection. >5. Metadata response will be enhanced to also contain broker version, >VersionInt and VersionString. VersionString will contain internal >version information. > Clarification: only needs to include version of the responding broker, not for all returned brokers. >6. Metadata request with single null topic and size of -1 can be used to >fetch metadata response with only broker version information and no >topic/broker info. 7. On receiving a metadata request with single null topic with size of >-1, broker will respond with only broker version. > Clarification: It should just skip the topic enumeration, broker enumeration should still be included since this is used to seed the broker list in the client. > > Please correct/add, if I missed out something. If the aforementioned > changes sound good, I can update the KIP-35 wiki, WIP PR and start a Vote > thread. > > On Fri, Mar 11, 2016 at 12:48 PM, Magnus Edenhill > wrote: > > > I'm not sure supporting specific interim versions between releases are > > really that big of a concern, > > for a start the protocol changes may be in flux and not settle until the > > formal release, secondly > > the 3rd party clients typically lag behind at least until the formal > > release before they implement support (for the first stated reason..). > > But this is still a good point and if we could use the version fields to > > specify a point between > > two formal releases then that would be useful to ease client development > > during that period. > > Grabbing 0.10.0 from versionInt and "IV" from versionString is an > > acceptable solution as long > > as there is some way for a client to distinguish the formal release. > > > > > > /Magnus > > > > > > > > > > 2016-03-11 20:27 GMT+01:00 Gwen Shapira : > > > > > Yeah, I'm not sure that 0.10.0-IV1 and 0.10.0-IV2 is what Magnus had > > > in mind when he was advocating for release versions in the protocol. > > > > > > But - if we serialize both the string and the integer Id of ApiVersion > > > into the Metadata object, I think both Magnus and Jay will be happy :) > > > > > > Gwen > > > > > > On Fri, Mar 11, 2016 at 11:22 AM, Ismael Juma > wrote: > > > > We introduced a way to bump the API version in between releases as > part > > > of > > > > the KIP-31/KIP-32 by the way. Extending that could maybe work. Take a > > > look > > > > at the ApiVersion class and its documentation. > > > > > > > > Ismael > > > > On 11 Mar 2016 19:06, "Gwen Shapira" wrote: > > > > > > > >> Magnus, > > > >> > > > >> If we go with release version as protocol version (which I agree is > > > >> much more user-friendly) - what will be the release version on > trunk? > > > >> 0.10.0-SNAPSHOT? > > > >> How will clients handle the fact that some 0.10.0-SNAPSHOT will have > > > >> different protocol than others (because we modify the protocol > > > >> multiple times between releases)? > > > >> > > > >> Gwen > > > >> > > > >> On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill < > mag...@edenhill.se> > > > >> wrote: > > > >> > Hi all, > > > >> > > > > >> > sorry for joining late in the game, the carribean got in the way. > > > >> > > > > >> > My thoughts: > > > >> > > > > >> > There is no way around the chicken problem, so the sooner we > can > > > >> > add protocol versioning functionality the better and we'll add > > > heuristics > > > >> > in clients to > > > >> > handle the migration period (e.g, what Dana has done in > > kafka-python). > > > >> > The focus at this point should be to mitigate the core issue > (allow > > > >> clients > > > >> > to know what is supported) > > > >> > in the least intrusive way. Hopefully we can redesign the protocol > > in > > > the > > > >> > future to add proper > > > >> > response headers, etc. > > > >> > > > > >> > I'm with Data that reusing the broker version as a protocol > version > > > will > > > >> > work just fine and > > > >> > saves us from administrating another version. > > > >> > From a client's perspective an explicit