[jira] [Updated] (KAFKA-2551) Unclean leader election docs outdated

2016-03-12 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-2551:
---
 Assignee: Manikumar Reddy  (was: jin xing)
Fix Version/s: 0.10.0.0

GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/1054

> Unclean leader election docs outdated
> -
>
> Key: KAFKA-2551
> URL: https://issues.apache.org/jira/browse/KAFKA-2551
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8.2.2
>Reporter: Stevo Slavic
>Assignee: Manikumar Reddy
>Priority: Trivial
>  Labels: documentation, newbie
> Fix For: 0.10.0.0
>
>
> Current unclean leader election docs state:
> {quote}
> In the future, we would like to make this configurable to better support use 
> cases where downtime is preferable to inconsistency.
> {quote}
> Since 0.8.2.0, unclean leader election strategy (whether to allow it or not) 
> is already configurable via {{unclean.leader.election.enable}} broker config 
> property.
> That sentence is in both 
> https://svn.apache.org/repos/asf/kafka/site/083/design.html and 
> https://svn.apache.org/repos/asf/kafka/site/082/design.html near the end of 
> "Unclean leader election: What if they all die?" section. Next section, 
> "Availability and Durability Guarantees", mentions ability to disable unclean 
> leader election, so likely just this one reference needs to be updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3388) Producer should only timeout a batch in the accumulator when metadata is missing.

2016-03-12 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-3388:
---

 Summary: Producer should only timeout a batch in the accumulator 
when metadata is missing.
 Key: KAFKA-3388
 URL: https://issues.apache.org/jira/browse/KAFKA-3388
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
Reporter: Jiangjie Qin
Priority: Blocker
 Fix For: 0.10.0.0


In KIP-19 we are reusing the request.timeout.ms to timeout the batches in the 
accumulator. We were intended to avoid the case that the batches sitting in the 
accumulator forever when topic metadata is missing.

Currently we are not checking if metadata is available or not when we timeout 
the batches in the accumulator (although the comments says we will check the 
metadata). This causes problem that once the previous batch hit a request 
timeout and got retried, all the subsequent batches will fail with timeout 
exception. We should only timeout the batches in the accumulator when the 
metadata of the partition is missing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2359) New consumer - partitions auto assigned only on poll

2016-03-12 Thread Ken Gidley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192034#comment-15192034
 ] 

Ken Gidley commented on KAFKA-2359:
---

How about this use case: 

I want to reprocess all the data in my log stream on application startup, using 
the same consumer group.id.  With 0.9.0.0, I call 'subscribe', then try to call 
'seekToBeginning' and it fails with "Caused by: 
java.lang.IllegalStateException: No current assignment for partition test1". 

It seems weird to have to call poll(), which will read some records, and then 
throw them away to re-read from the beginning of the topic.

> New consumer - partitions auto assigned only on poll
> 
>
> Key: KAFKA-2359
> URL: https://issues.apache.org/jira/browse/KAFKA-2359
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Stevo Slavic
>Priority: Minor
>
> In the new consumer I encountered unexpected behavior. After constructing 
> {{KafkaConsumer}} instance with configured consumer rebalance callback 
> handler, and subscribing to a topic with "consumer.subscribe(topic)", 
> retrieving subscriptions would return empty set and callback handler would 
> not get called (no partitions ever assigned or revoked), no matter how long 
> instance was up.
> Then I found by inspecting {{KafkaConsumer}} code that partition assignment 
> will only be triggered on first {{poll}}, since {{pollOnce}} has:
> {noformat}
> // ensure we have partitions assigned if we expect to
> if (subscriptions.partitionsAutoAssigned())
> coordinator.ensurePartitionAssignment();
> {noformat}
> I'm proposing to fix this by including same {{ensurePartitionAssignment}} 
> fragment in {{KafkaConsumer.subscriptions}} accessor as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


KStreams Proposal

2016-03-12 Thread Bill Bejeck
Hi All,

While working with KStream/KStreamImp I discovered that there does not seem
to be any way to connect the results of the KStream.process method with a
sink node.

I'd like to propose an addition to the API a "processTo" method.

I've looked at and used the "transform", "reduceByKey" and "aggregateByKey"
 methods, but "processTo" would work like a more general purpose collector
terminating the KStream and allow for writing out results to an arbitrary
topic (regardless of key type).


 I've done a quick prototype and some  initial testing locally on my fork.
If you think this could be useful I can add unit tests and create a PR.
I've included the proposed code changes and the test driver code below


KStream.java additions

void processTo(String topic,  ProcessorSupplier processorSupplier,
String... stateStoreNames);

void processTo(String topic, ProcessorSupplier processorSupplier,
 Serializer keySerializer, Serializer valSerializer, String...
stateStoreNames);


KStreamImpl.java additions

 @Override
public void processTo(String topic, ProcessorSupplier
processorSupplier,  String... stateStoreNames) {
processTo(topic, processorSupplier, null, null, stateStoreNames);
}

@SuppressWarnings("unchecked")
@Override
public void processTo(String topic,ProcessorSupplier
processorSupplier,  Serializer keySerializer, Serializer
valSerializer, String... stateStoreNames) {
String processorName = topology.newName(PROCESSOR_NAME);
String sinkName = topology.newName(SINK_NAME);
StreamPartitioner streamPartitioner = null;

if (keySerializer != null && keySerializer instanceof
WindowedSerializer) {
WindowedSerializer windowedSerializer =
(WindowedSerializer) keySerializer;
streamPartitioner = (StreamPartitioner) new
WindowedStreamPartitioner(windowedSerializer);
}

topology.addProcessor(processorName, processorSupplier, this.name);
topology.addSink(sinkName,topic, keySerializer, valSerializer,
streamPartitioner, processorName);
topology.connectProcessorAndStateStores(processorName,
stateStoreNames);
}


Test Driver

public class TestDriver {

public static void main(String[] args) {
StreamsConfig config = new StreamsConfig(getProperties());
KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream transactionKStream =
 kStreamBuilder.stream("input");

transactionKStream.processTo("output",UpperCaseProcessor::new);

System.out.println("Starting process-to Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,config);
kafkaStreams.start();
System.out.println("Now started process-to Example");
}

private static class UpperCaseProcessor extends
AbstractProcessor {
@Override
public void process(String key, String value) {
context().forward(key, value.toUpperCase());
context().commit();
}
}

private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
props.put("group.id", "test-streams");
props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class);
return props;
}

}


[GitHub] kafka pull request: KAFKA-2551: Update Unclean leader election doc...

2016-03-12 Thread omkreddy
GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/1054

KAFKA-2551: Update Unclean leader election docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-2551

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1054


commit 8e0829c6e4128bfa719146579743746fee0cc4a0
Author: Manikumar reddy O 
Date:   2016-03-12T16:33:53Z

KAFKA-2551; Update Unclean leader election docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-12 Thread Magnus Edenhill
Good summary!

Some comments inline:


2016-03-11 22:59 GMT+01:00 Ashish Singh :

> Sounds like we are mostly in agreement. Following are the key points.
>
>1. Every time a protocol version changes, for any request/response,
>broker version, ApiVersion, will be bumped up.
>

Maybe clarify that the protocol version (=broker version) bump is implicit
by a release (formal/final or interim).



>2. Protocol documentation will be versioned with broker version. Every
>time there is a broker version change, protocol documentation version
> needs
>to be updated and linked to main documentation page.
>3. Deprecation of protocol version will be done via marking the version
>as deprecated on the protocol documentation.
>4. On getting unknown protocol version, broker will send an empty
>response, instead of simply closing client connection.
>5. Metadata response will be enhanced to also contain broker version,
>VersionInt and VersionString. VersionString will contain internal
>version information.
>

Clarification: only needs to include version of the responding broker, not
for all returned brokers.


>6. Metadata request with single null topic and size of -1 can be used to
>fetch metadata response with only broker version information and no
>topic/broker info.

   7. On receiving a metadata request with single null topic with size of
>-1, broker will respond with only broker version.
>

Clarification: It should just skip the topic enumeration, broker
enumeration should still be included
since this is used to seed the broker list in the client.


>
> Please correct/add, if I missed out something. If the aforementioned
> changes sound good, I can update the KIP-35 wiki, WIP PR and start a Vote
> thread.
>
> On Fri, Mar 11, 2016 at 12:48 PM, Magnus Edenhill 
> wrote:
>
> > I'm not sure supporting specific interim versions between releases are
> > really that big of a concern,
> > for a start the protocol changes may be in flux and not settle until the
> > formal release, secondly
> > the 3rd party clients typically lag behind at least until the formal
> > release before they implement support (for the first stated reason..).
> > But this is still a good point and if we could use the version fields to
> > specify a point between
> > two formal releases then that would be useful to ease client development
> > during that period.
> > Grabbing 0.10.0 from versionInt and "IV" from versionString is an
> > acceptable solution as long
> > as there is some way for a client to distinguish the formal release.
> >
> >
> > /Magnus
> >
> >
> >
> >
> > 2016-03-11 20:27 GMT+01:00 Gwen Shapira :
> >
> > > Yeah, I'm not sure that 0.10.0-IV1 and 0.10.0-IV2 is what Magnus had
> > > in mind when he was advocating for release versions in the protocol.
> > >
> > > But - if we serialize both the string and the integer Id of ApiVersion
> > > into the Metadata object, I think both Magnus and Jay will be happy :)
> > >
> > > Gwen
> > >
> > > On Fri, Mar 11, 2016 at 11:22 AM, Ismael Juma 
> wrote:
> > > > We introduced a way to bump the API version in between releases as
> part
> > > of
> > > > the KIP-31/KIP-32 by the way. Extending that could maybe work. Take a
> > > look
> > > > at the ApiVersion class and its documentation.
> > > >
> > > > Ismael
> > > > On 11 Mar 2016 19:06, "Gwen Shapira"  wrote:
> > > >
> > > >> Magnus,
> > > >>
> > > >> If we go with release version as protocol version (which I agree is
> > > >> much more user-friendly) - what will be the release version on
> trunk?
> > > >> 0.10.0-SNAPSHOT?
> > > >> How will clients handle the fact that some 0.10.0-SNAPSHOT will have
> > > >> different protocol than others (because we modify the protocol
> > > >> multiple times between releases)?
> > > >>
> > > >> Gwen
> > > >>
> > > >> On Thu, Mar 10, 2016 at 1:52 PM, Magnus Edenhill <
> mag...@edenhill.se>
> > > >> wrote:
> > > >> > Hi all,
> > > >> >
> > > >> > sorry for joining late in the game, the carribean got in the way.
> > > >> >
> > > >> > My thoughts:
> > > >> >
> > > >> > There is no way around the chicken problem, so the sooner we
> can
> > > >> > add protocol versioning functionality the better and we'll add
> > > heuristics
> > > >> > in clients to
> > > >> > handle the migration period (e.g, what Dana has done in
> > kafka-python).
> > > >> > The focus at this point should be to mitigate the core issue
> (allow
> > > >> clients
> > > >> > to know what is supported)
> > > >> > in the least intrusive way. Hopefully we can redesign the protocol
> > in
> > > the
> > > >> > future to add proper
> > > >> > response headers, etc.
> > > >> >
> > > >> > I'm with Data that reusing the broker version as a protocol
> version
> > > will
> > > >> > work just fine and
> > > >> > saves us from administrating another version.
> > > >> > From a client's perspective an explicit