[jira] [Issue Comment Deleted] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2016-11-08 Thread hangzhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hangzhao updated KAFKA-3450:

Comment: was deleted

(was: 44)

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Assignee: Jun Rao
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 5 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | 

[jira] [Commented] (KAFKA-3450) Producer blocks on send to topic that doesn't exist if auto create is disabled

2016-11-08 Thread hangzhao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15650041#comment-15650041
 ] 

hangzhao commented on KAFKA-3450:
-

44

> Producer blocks on send to topic that doesn't exist if auto create is disabled
> --
>
> Key: KAFKA-3450
> URL: https://issues.apache.org/jira/browse/KAFKA-3450
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.9.0.1
>Reporter: Michal Turek
>Assignee: Jun Rao
>Priority: Critical
>
> {{producer.send()}} is blocked for {{max.block.ms}} (default 60 seconds) if 
> the destination topic doesn't exist and if their automatic creation is 
> disabled. Warning from NetworkClient containing UNKNOWN_TOPIC_OR_PARTITION is 
> logged every 100 ms in a loop until the 60 seconds timeout expires, but the 
> operation is not recoverable.
> Preconditions
> - Kafka 0.9.0.1 with default configuration and auto.create.topics.enable=false
> - Kafka 0.9.0.1 clients.
> Example minimalist code
> https://github.com/avast/kafka-tests/blob/master/src/main/java/com/avast/kafkatests/othertests/nosuchtopic/NoSuchTopicTest.java
> {noformat}
> /**
>  * Test of sending to a topic that does not exist while automatic creation of 
> topics is disabled in Kafka (auto.create.topics.enable=false).
>  */
> public class NoSuchTopicTest {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(NoSuchTopicTest.class);
> public static void main(String[] args) {
> Properties properties = new Properties();
> properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
> properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
> NoSuchTopicTest.class.getSimpleName());
> properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000"); 
> // Default is 60 seconds
> try (Producer producer = new 
> KafkaProducer<>(properties, new StringSerializer(), new StringSerializer())) {
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExist", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> LOGGER.info("Sending message");
> producer.send(new ProducerRecord<>("ThisTopicDoesNotExistToo", 
> "key", "value"), (metadata, exception) -> {
> if (exception != null) {
> LOGGER.error("Send failed: {}", exception.toString());
> } else {
> LOGGER.info("Send successful: {}-{}/{}", 
> metadata.topic(), metadata.partition(), metadata.offset());
> }
> });
> }
> }
> }
> {noformat}
> Related output
> {noformat}
> 2016-03-23 12:44:37.725 INFO  c.a.k.o.nosuchtopic.NoSuchTopicTest [main]: 
> Sending message (NoSuchTopicTest.java:26)
> 2016-03-23 12:44:37.830 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 0 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:37.928 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 1 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.028 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 2 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.130 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 3 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.231 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 4 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.332 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | NoSuchTopicTest]: Error while fetching 
> metadata with correlation id 5 : 
> {ThisTopicDoesNotExist=UNKNOWN_TOPIC_OR_PARTITION} (NetworkClient.java:582)
> 2016-03-23 12:44:38.433 WARN  o.a.kafka.clients.NetworkClient 
> [kafka-producer-network-thread | 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
While you can do whatever you want with a namespace and your code,
what I'd expect is for each app to namespaces configurable...

So if I accidentally used 666 for my HR department, and still want to
run RadaiApp, I can config "namespace=42" for RadaiApp and everything
will look normal.

This means you only need to sync usage inside your own organization.
Still hard, but somewhat easier than syncing with the entire world.

On Tue, Nov 8, 2016 at 10:07 PM, radai  wrote:
> and we can start with {namespace, id} and no re-mapping support and always
> add it later on if/when collisions actually happen (i dont think they'd be
> a problem).
>
> every interested party (so orgs or individuals) could then register a
> prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever with
> the 2nd ID - so once linkedin registers, say 3, then linkedin devs are free
> to use {3, *} with a reasonable expectation to to collide with anything
> else. further partitioning of that * becomes linkedin's problem, but the
> "upstream registration" of a namespace only has to happen once.
>
> On Tue, Nov 8, 2016 at 9:03 PM, James Cheng  wrote:
>
>>
>>
>>
>> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
>> >
>> > Thank you so much for this clear and fair summary of the arguments.
>> >
>> > I'm in favor of ints. Not a deal-breaker, but in favor.
>> >
>> > Even more in favor of Magnus's decentralized suggestion with Roger's
>> > tweak: add a namespace for headers. This will allow each app to just
>> > use whatever IDs it wants internally, and then let the admin deploying
>> > the app figure out an available namespace ID for the app to live in.
>> > So io.confluent.schema-registry can be namespace 0x01 on my deployment
>> > and 0x57 on yours, and the poor guys developing the app don't need to
>> > worry about that.
>> >
>>
>> Gwen, if I understand your example right, an application deployer might
>> decide to use 0x01 in one deployment, and that means that once the message
>> is written into the broker, it will be saved on the broker with that
>> specific namespace (0x01).
>>
>> If you were to mirror that message into another cluster, the 0x01 would
>> accompany the message, right? What if the deployers of the same app in the
>> other cluster uses 0x57? They won't understand each other?
>>
>> I'm not sure that's an avoidable problem. I think it simply means that in
>> order to share data, you have to also have a shared (agreed upon)
>> understanding of what the namespaces mean. Which I think makes sense,
>> because the alternate (sharing *nothing* at all) would mean that there
>> would be no way to understand each other.
>>
>> -James
>>
>> > Gwen
>> >
>> > On Tue, Nov 8, 2016 at 4:23 PM, radai 
>> wrote:
>> >> +1 for sean's document. it covers pretty much all the trade-offs and
>> >> provides concrete figures to argue about :-)
>> >> (nit-picking - used the same xkcd twice, also trove has been superceded
>> for
>> >> purposes of high performance collections: look at
>> >> https://github.com/leventov/Koloboke)
>> >>
>> >> so to sum up the string vs int debate:
>> >>
>> >> performance - you can do 140k ops/sec _per thread_ with string headers.
>> you
>> >> could do x2-3 better with ints. there's no arguing the relative diff
>> >> between the two, there's only the question of whether or not _the rest
>> of
>> >> kafka_ operates fast enough to care. if we want to make choices solely
>> >> based on performance we need ints. if we are willing to
>> settle/compromise
>> >> for a nicer (to some) API than strings are good enough for the current
>> >> state of affairs.
>> >>
>> >> message size - with batching and compression it comes down to a ~5%
>> >> difference (internal testing, not in the doc. maybe would help adding if
>> >> this becomes a point of contention?). this means it wont really affect
>> >> kafka in "throughput mode" (large, compressed batches). in "low latency"
>> >> mode (meaning less/no batching and compression) the difference can be
>> >> extreme (it'll easily be an order of magnitude with small payloads like
>> >> stock ticks and header keys of the form
>> >> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
>> topics at
>> >> linkedin where actual payloads are ~2 ints and are eclipsed by our
>> in-house
>> >> audit "header" which is why we liked ints to begin with.
>> >>
>> >> "ease of use" - strings would probably still require _some_ degree of
>> >> partitioning by convention (imagine if everyone used the key "infra"...)
>> >> but its very intuitive for java devs to do anyway (reverse-domain is
>> >> ingrained into java developers at a young age :-) ). also most java devs
>> >> find Map more intuitive than Map -
>> >> probably because of other text-based protocols like http. ints would
>> >> require a number registry. if you think number registries are hard just

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread radai
and we can start with {namespace, id} and no re-mapping support and always
add it later on if/when collisions actually happen (i dont think they'd be
a problem).

every interested party (so orgs or individuals) could then register a
prefix (0 = reserved, 1 = confluent ... 666 = me :-) ) and do whatever with
the 2nd ID - so once linkedin registers, say 3, then linkedin devs are free
to use {3, *} with a reasonable expectation to to collide with anything
else. further partitioning of that * becomes linkedin's problem, but the
"upstream registration" of a namespace only has to happen once.

On Tue, Nov 8, 2016 at 9:03 PM, James Cheng  wrote:

>
>
>
> > On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
> >
> > Thank you so much for this clear and fair summary of the arguments.
> >
> > I'm in favor of ints. Not a deal-breaker, but in favor.
> >
> > Even more in favor of Magnus's decentralized suggestion with Roger's
> > tweak: add a namespace for headers. This will allow each app to just
> > use whatever IDs it wants internally, and then let the admin deploying
> > the app figure out an available namespace ID for the app to live in.
> > So io.confluent.schema-registry can be namespace 0x01 on my deployment
> > and 0x57 on yours, and the poor guys developing the app don't need to
> > worry about that.
> >
>
> Gwen, if I understand your example right, an application deployer might
> decide to use 0x01 in one deployment, and that means that once the message
> is written into the broker, it will be saved on the broker with that
> specific namespace (0x01).
>
> If you were to mirror that message into another cluster, the 0x01 would
> accompany the message, right? What if the deployers of the same app in the
> other cluster uses 0x57? They won't understand each other?
>
> I'm not sure that's an avoidable problem. I think it simply means that in
> order to share data, you have to also have a shared (agreed upon)
> understanding of what the namespaces mean. Which I think makes sense,
> because the alternate (sharing *nothing* at all) would mean that there
> would be no way to understand each other.
>
> -James
>
> > Gwen
> >
> > On Tue, Nov 8, 2016 at 4:23 PM, radai 
> wrote:
> >> +1 for sean's document. it covers pretty much all the trade-offs and
> >> provides concrete figures to argue about :-)
> >> (nit-picking - used the same xkcd twice, also trove has been superceded
> for
> >> purposes of high performance collections: look at
> >> https://github.com/leventov/Koloboke)
> >>
> >> so to sum up the string vs int debate:
> >>
> >> performance - you can do 140k ops/sec _per thread_ with string headers.
> you
> >> could do x2-3 better with ints. there's no arguing the relative diff
> >> between the two, there's only the question of whether or not _the rest
> of
> >> kafka_ operates fast enough to care. if we want to make choices solely
> >> based on performance we need ints. if we are willing to
> settle/compromise
> >> for a nicer (to some) API than strings are good enough for the current
> >> state of affairs.
> >>
> >> message size - with batching and compression it comes down to a ~5%
> >> difference (internal testing, not in the doc. maybe would help adding if
> >> this becomes a point of contention?). this means it wont really affect
> >> kafka in "throughput mode" (large, compressed batches). in "low latency"
> >> mode (meaning less/no batching and compression) the difference can be
> >> extreme (it'll easily be an order of magnitude with small payloads like
> >> stock ticks and header keys of the form
> >> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
> topics at
> >> linkedin where actual payloads are ~2 ints and are eclipsed by our
> in-house
> >> audit "header" which is why we liked ints to begin with.
> >>
> >> "ease of use" - strings would probably still require _some_ degree of
> >> partitioning by convention (imagine if everyone used the key "infra"...)
> >> but its very intuitive for java devs to do anyway (reverse-domain is
> >> ingrained into java developers at a young age :-) ). also most java devs
> >> find Map more intuitive than Map -
> >> probably because of other text-based protocols like http. ints would
> >> require a number registry. if you think number registries are hard just
> >> look at the wiki page for KIPs (specifically the number for next
> available
> >> KIP) and think again - we are probably talking about the same volume of
> >> requests. also this would only be "required" (good citizenship, more
> like)
> >> if you want to publish your plugin for others to use. within your org do
> >> whatever you want - just know that if you use [some "reserved" range]
> and a
> >> future kafka update breaks it its your problem. RTFM.
> >>
> >> personally im in favor of ints.
> >>
> >> having said that (and like nacho) I will settle if int vs string remains
> >> the only 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
Yes, you got it right.

With MirrorMaker's event handlers (which we'll need to update to
support headers) you can make sure the header namespaces will change
when you replicate between clusters so 0x01 will become 0x57 when
Mirrored. I'm not a huge fan, but definitely an option.

If you choose not to go that route then the namespace definitions must
be shared between all clusters in the organization.
I believe it is a simpler problem than coordinating all clusters in
the world, and more useful than not coordinating anything at all.

On Tue, Nov 8, 2016 at 9:03 PM, James Cheng  wrote:
>
>
>
>> On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
>>
>> Thank you so much for this clear and fair summary of the arguments.
>>
>> I'm in favor of ints. Not a deal-breaker, but in favor.
>>
>> Even more in favor of Magnus's decentralized suggestion with Roger's
>> tweak: add a namespace for headers. This will allow each app to just
>> use whatever IDs it wants internally, and then let the admin deploying
>> the app figure out an available namespace ID for the app to live in.
>> So io.confluent.schema-registry can be namespace 0x01 on my deployment
>> and 0x57 on yours, and the poor guys developing the app don't need to
>> worry about that.
>>
>
> Gwen, if I understand your example right, an application deployer might 
> decide to use 0x01 in one deployment, and that means that once the message is 
> written into the broker, it will be saved on the broker with that specific 
> namespace (0x01).
>
> If you were to mirror that message into another cluster, the 0x01 would 
> accompany the message, right? What if the deployers of the same app in the 
> other cluster uses 0x57? They won't understand each other?
>
> I'm not sure that's an avoidable problem. I think it simply means that in 
> order to share data, you have to also have a shared (agreed upon) 
> understanding of what the namespaces mean. Which I think makes sense, because 
> the alternate (sharing *nothing* at all) would mean that there would be no 
> way to understand each other.
>
> -James
>
>> Gwen
>>
>> On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
>>> +1 for sean's document. it covers pretty much all the trade-offs and
>>> provides concrete figures to argue about :-)
>>> (nit-picking - used the same xkcd twice, also trove has been superceded for
>>> purposes of high performance collections: look at
>>> https://github.com/leventov/Koloboke)
>>>
>>> so to sum up the string vs int debate:
>>>
>>> performance - you can do 140k ops/sec _per thread_ with string headers. you
>>> could do x2-3 better with ints. there's no arguing the relative diff
>>> between the two, there's only the question of whether or not _the rest of
>>> kafka_ operates fast enough to care. if we want to make choices solely
>>> based on performance we need ints. if we are willing to settle/compromise
>>> for a nicer (to some) API than strings are good enough for the current
>>> state of affairs.
>>>
>>> message size - with batching and compression it comes down to a ~5%
>>> difference (internal testing, not in the doc. maybe would help adding if
>>> this becomes a point of contention?). this means it wont really affect
>>> kafka in "throughput mode" (large, compressed batches). in "low latency"
>>> mode (meaning less/no batching and compression) the difference can be
>>> extreme (it'll easily be an order of magnitude with small payloads like
>>> stock ticks and header keys of the form
>>> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such topics at
>>> linkedin where actual payloads are ~2 ints and are eclipsed by our in-house
>>> audit "header" which is why we liked ints to begin with.
>>>
>>> "ease of use" - strings would probably still require _some_ degree of
>>> partitioning by convention (imagine if everyone used the key "infra"...)
>>> but its very intuitive for java devs to do anyway (reverse-domain is
>>> ingrained into java developers at a young age :-) ). also most java devs
>>> find Map more intuitive than Map -
>>> probably because of other text-based protocols like http. ints would
>>> require a number registry. if you think number registries are hard just
>>> look at the wiki page for KIPs (specifically the number for next available
>>> KIP) and think again - we are probably talking about the same volume of
>>> requests. also this would only be "required" (good citizenship, more like)
>>> if you want to publish your plugin for others to use. within your org do
>>> whatever you want - just know that if you use [some "reserved" range] and a
>>> future kafka update breaks it its your problem. RTFM.
>>>
>>> personally im in favor of ints.
>>>
>>> having said that (and like nacho) I will settle if int vs string remains
>>> the only obstacle to this.
>>>
>>> On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis 
>>> wrote:
>>>
 I think it's 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread James Cheng



> On Nov 8, 2016, at 5:54 PM, Gwen Shapira  wrote:
> 
> Thank you so much for this clear and fair summary of the arguments.
> 
> I'm in favor of ints. Not a deal-breaker, but in favor.
> 
> Even more in favor of Magnus's decentralized suggestion with Roger's
> tweak: add a namespace for headers. This will allow each app to just
> use whatever IDs it wants internally, and then let the admin deploying
> the app figure out an available namespace ID for the app to live in.
> So io.confluent.schema-registry can be namespace 0x01 on my deployment
> and 0x57 on yours, and the poor guys developing the app don't need to
> worry about that.
> 

Gwen, if I understand your example right, an application deployer might decide 
to use 0x01 in one deployment, and that means that once the message is written 
into the broker, it will be saved on the broker with that specific namespace 
(0x01). 

If you were to mirror that message into another cluster, the 0x01 would 
accompany the message, right? What if the deployers of the same app in the 
other cluster uses 0x57? They won't understand each other?

I'm not sure that's an avoidable problem. I think it simply means that in order 
to share data, you have to also have a shared (agreed upon) understanding of 
what the namespaces mean. Which I think makes sense, because the alternate 
(sharing *nothing* at all) would mean that there would be no way to understand 
each other.

-James

> Gwen
> 
> On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
>> +1 for sean's document. it covers pretty much all the trade-offs and
>> provides concrete figures to argue about :-)
>> (nit-picking - used the same xkcd twice, also trove has been superceded for
>> purposes of high performance collections: look at
>> https://github.com/leventov/Koloboke)
>> 
>> so to sum up the string vs int debate:
>> 
>> performance - you can do 140k ops/sec _per thread_ with string headers. you
>> could do x2-3 better with ints. there's no arguing the relative diff
>> between the two, there's only the question of whether or not _the rest of
>> kafka_ operates fast enough to care. if we want to make choices solely
>> based on performance we need ints. if we are willing to settle/compromise
>> for a nicer (to some) API than strings are good enough for the current
>> state of affairs.
>> 
>> message size - with batching and compression it comes down to a ~5%
>> difference (internal testing, not in the doc. maybe would help adding if
>> this becomes a point of contention?). this means it wont really affect
>> kafka in "throughput mode" (large, compressed batches). in "low latency"
>> mode (meaning less/no batching and compression) the difference can be
>> extreme (it'll easily be an order of magnitude with small payloads like
>> stock ticks and header keys of the form
>> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such topics at
>> linkedin where actual payloads are ~2 ints and are eclipsed by our in-house
>> audit "header" which is why we liked ints to begin with.
>> 
>> "ease of use" - strings would probably still require _some_ degree of
>> partitioning by convention (imagine if everyone used the key "infra"...)
>> but its very intuitive for java devs to do anyway (reverse-domain is
>> ingrained into java developers at a young age :-) ). also most java devs
>> find Map more intuitive than Map -
>> probably because of other text-based protocols like http. ints would
>> require a number registry. if you think number registries are hard just
>> look at the wiki page for KIPs (specifically the number for next available
>> KIP) and think again - we are probably talking about the same volume of
>> requests. also this would only be "required" (good citizenship, more like)
>> if you want to publish your plugin for others to use. within your org do
>> whatever you want - just know that if you use [some "reserved" range] and a
>> future kafka update breaks it its your problem. RTFM.
>> 
>> personally im in favor of ints.
>> 
>> having said that (and like nacho) I will settle if int vs string remains
>> the only obstacle to this.
>> 
>> On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis 
>> wrote:
>> 
>>> I think it's well known I've been pushing for ints (and I could switch to
>>> 16 bit shorts if pressed).
>>> 
>>> - efficient (space)
>>> - efficient (processing)
>>> - easily partitionable
>>> 
>>> 
>>> However, if the only thing that is keeping us from adopting headers is the
>>> use of strings vs ints as keys, then I would cave in and accept strings. If
>>> we do so, I would like to limit string keys to 128 bytes in length.  This
>>> way 1) I could use a 3 letter string if I wanted (effectively using 4 total
>>> bytes), 2) limit overall impact of possible keys (don't really want people
>>> to send a 16K header string key).
>>> 
>>> Nacho
>>> 
>>> 
>>> On Tue, Nov 8, 2016 at 3:35 PM, Gwen Shapira 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Sean McCauliff
A local namespace mapping from namespace ids to ints would definitely solve
the problem of having a global namespace and would make the int header keys
potentially more readable for logging and debugging purposes.  But this
means another (potentially very large) set of configuration parameters that
need to be present on each component that wants to inspect the headers. I'm
sure it will be a fun day to track down that class of misconfiguration.

If the brokers are inspecting the headers then the brokers need this
config.  If the config changes then the brokers need to be restarted which
seems pretty expensive.  Otherwise there now needs to be a new way to
update the broker with this information.

Java itself does not have namespace collisions often and there is not a
central registration of namespaces. The set of Kafka infrastructure
engineers is much smaller than that namespace.  Having reasonable names
should allow every header user to peacefully coexist.

--
Sean McCauliff
Staff Software Engineer
Kafka

smccaul...@linkedin.com
linkedin.com/in/sean-mccauliff-b563192

On Mon, Nov 7, 2016 at 2:10 PM, Magnus Edenhill  wrote:

> Hi,
>
> I'm +1 for adding generic message headers, but I do share the concerns
> previously aired on this thread and during the KIP meeting.
>
> So let me propose a slimmer alternative that does not require any sort of
> global header registry, does not affect broker performance or operations,
> and adds as little overhead as possible.
>
>
> Message
> 
> The protocol Message type is extended with a Headers array consting of
> Tags, where a Tag is defined as:
>int16 Id
>int16 Len  // binary_data length
>binary_data[Len]  // opaque binary data
>
>
> Ids
> ---
> The Id space is not centrally managed, so whenever an application needs to
> add headers, or use an eco-system plugin that does, its Id allocation will
> need to be manually configured.
> This moves the allocation concern from the global space down to
> organization level and avoids the risk for id conflicts.
> Example pseudo-config for some app:
> sometrackerplugin.tag.sourcev3.id=1000
> dbthing.tag.tablename.id=1001
> myschemareg.tag.schemaname.id=1002
> myschemareg.tag.schemaversion.id=1003
>
>
> Each header-writing or header-reading plugin must provide means (typically
> through configuration) to specify the tag for each header it uses. Defaults
> should be avoided.
> A consumer silently ignores tags it does not have a mapping for (since the
> binary_data can't be parsed without knowing what it is).
>
> Id range 0..999 is reserved for future use by the broker and must not be
> used by plugins.
>
>
>
> Broker
> -
> The broker does not process the tags (other than the standard protocol
> syntax verification), it simply stores and forwards them as opaque data.
>
> Standard message translation (removal of Headers) kicks in for older
> clients.
>
>
> Why not string ids?
> -
> String ids might seem like a good idea, but:
>  * does not really solve uniqueness
>  * consumes a lot of space (2 byte string length + string, per header) to
> be meaningful
>  * doesn't really say anything how to parse the tag's data, so it is in
> effect useless on its own.
>
>
> Regards,
> Magnus
>
>
>
>
> 2016-11-07 18:32 GMT+01:00 Michael Pearce :
>
> > Hi Roger,
> >
> > Thanks for the support.
> >
> > I think the key thing is to have a common key space to make an ecosystem,
> > there does have to be some level of contract for people to play nicely.
> >
> > Having map or as per current proposed in kip of having a
> > numerical key space of  map is a level of the contract that
> > most people would expect.
> >
> > I think the example in a previous comment someone else made linking to
> AWS
> > blog and also implemented api where originally they didn’t have a header
> > space but not they do, where keys are uniform but the value can be
> string,
> > int, anything is a good example.
> >
> > Having a custom MetadataSerializer is something we had played with, but
> > discounted the idea, as if you wanted everyone to work the same way in
> the
> > ecosystem, having to have this also customizable makes it a bit harder.
> > Think about making the whole message record custom serializable, this
> would
> > make it fairly tricky (though it would not be impossible) to have made
> work
> > nicely. Having the value customizable we thought is a reasonable tradeoff
> > here of flexibility over contract of interaction between different
> parties.
> >
> > Is there a particular case or benefit of having serialization
> customizable
> > that you have in mind?
> >
> > Saying this it is obviously something that could be implemented, if there
> > is a need. If we did go this avenue I think a defaulted serializer
> > implementation should exist so for the 80:20 rule, people can just have
> the
> > broker and clients get 

[GitHub] kafka pull request #2118: MINOR: improve exception message for incompatible ...

2016-11-08 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2118

MINOR: improve exception message for incompatible Serdes to actual 
key/value data types



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
hotfixImproveSerdeTypeMissmatchError

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2118.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2118


commit db9e29e6b4720736997788cedf7fcea022b7d565
Author: Matthias J. Sax 
Date:   2016-11-09T03:03:43Z

MINOR: improve exception message for incompatible Serdes to actual 
key/value data types




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread radai
"...whether the broker would ever want/need to act on non-standard
headers..."

core broker code? no. never (which is why i suggested lazily parsing the
headers on 1st call from a bytebuffer slice() somewhere earlier in the
discussion).

broker-side plugins however ...

so here's something off the top of my head: as a kafka infra provider, i
dont want _anyone_ producing into one of "my" brokers without audit data.
it messes with my reporting/billing systems.

i cant go chasing after (literally) hundreds of internal developers, each
copy-pasting bits and pieces of code from stack overflow, and they only
come to me when said copy-pasted code doesnt work :-)

i would like to write a plugin that just drops messages without an audit
header on the floor (think of this as the equivalent of a
content-inspecting firewall).

On Tue, Nov 8, 2016 at 6:41 PM, radai  wrote:

> +1 for {namespace, plugin}, would definitely cut down on registration
> overhead.
>
> On Tue, Nov 8, 2016 at 5:54 PM, Gwen Shapira  wrote:
>
>> Thank you so much for this clear and fair summary of the arguments.
>>
>> I'm in favor of ints. Not a deal-breaker, but in favor.
>>
>> Even more in favor of Magnus's decentralized suggestion with Roger's
>> tweak: add a namespace for headers. This will allow each app to just
>> use whatever IDs it wants internally, and then let the admin deploying
>> the app figure out an available namespace ID for the app to live in.
>> So io.confluent.schema-registry can be namespace 0x01 on my deployment
>> and 0x57 on yours, and the poor guys developing the app don't need to
>> worry about that.
>>
>> Gwen
>>
>> On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
>> > +1 for sean's document. it covers pretty much all the trade-offs and
>> > provides concrete figures to argue about :-)
>> > (nit-picking - used the same xkcd twice, also trove has been superceded
>> for
>> > purposes of high performance collections: look at
>> > https://github.com/leventov/Koloboke)
>> >
>> > so to sum up the string vs int debate:
>> >
>> > performance - you can do 140k ops/sec _per thread_ with string headers.
>> you
>> > could do x2-3 better with ints. there's no arguing the relative diff
>> > between the two, there's only the question of whether or not _the rest
>> of
>> > kafka_ operates fast enough to care. if we want to make choices solely
>> > based on performance we need ints. if we are willing to
>> settle/compromise
>> > for a nicer (to some) API than strings are good enough for the current
>> > state of affairs.
>> >
>> > message size - with batching and compression it comes down to a ~5%
>> > difference (internal testing, not in the doc. maybe would help adding if
>> > this becomes a point of contention?). this means it wont really affect
>> > kafka in "throughput mode" (large, compressed batches). in "low latency"
>> > mode (meaning less/no batching and compression) the difference can be
>> > extreme (it'll easily be an order of magnitude with small payloads like
>> > stock ticks and header keys of the form
>> > "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
>> topics at
>> > linkedin where actual payloads are ~2 ints and are eclipsed by our
>> in-house
>> > audit "header" which is why we liked ints to begin with.
>> >
>> > "ease of use" - strings would probably still require _some_ degree of
>> > partitioning by convention (imagine if everyone used the key "infra"...)
>> > but its very intuitive for java devs to do anyway (reverse-domain is
>> > ingrained into java developers at a young age :-) ). also most java devs
>> > find Map more intuitive than Map -
>> > probably because of other text-based protocols like http. ints would
>> > require a number registry. if you think number registries are hard just
>> > look at the wiki page for KIPs (specifically the number for next
>> available
>> > KIP) and think again - we are probably talking about the same volume of
>> > requests. also this would only be "required" (good citizenship, more
>> like)
>> > if you want to publish your plugin for others to use. within your org do
>> > whatever you want - just know that if you use [some "reserved" range]
>> and a
>> > future kafka update breaks it its your problem. RTFM.
>> >
>> > personally im in favor of ints.
>> >
>> > having said that (and like nacho) I will settle if int vs string remains
>> > the only obstacle to this.
>> >
>> > On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis > >
>> > wrote:
>> >
>> >> I think it's well known I've been pushing for ints (and I could switch
>> to
>> >> 16 bit shorts if pressed).
>> >>
>> >> - efficient (space)
>> >> - efficient (processing)
>> >> - easily partitionable
>> >>
>> >>
>> >> However, if the only thing that is keeping us from adopting headers is
>> the
>> >> use of strings vs ints as keys, then I would cave in and accept
>> strings. If
>> 

[jira] [Updated] (KAFKA-4393) Improve invalid/negative TS handling

2016-11-08 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4393:
---
Description: 
Currently, Kafka Streams does not handle invalid/negative timestamps returned 
from the {{TimestampExtractor}} gracefully, but fails with an exception, 
because negative timestamps cannot get handled in a meaningful way for any time 
based (ie, window) operators like window aggregates and joins.

We want to change Streams to a auto-drop behavior for negative timestamps for 
those records (without any further user notification about dropped record) to 
enable users to "step over" those records and keep going (instead of an 
exception). To guard the user from silently dropping messages by default (and 
kept current fail-fast behavior), we change the default extractor 
{{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
timestamp and raise an exception if it is negative. Furthermore, we add a 
"drop-and-log" extractor, as this seems to be a common behavior user might want 
to have. For any other behavior, users can still provide a custom TS-Extractor 
implementation.

  was:
Currently, Kafka Streams does not handle invalid/negative timestamps returned 
from the {{TimestampExtractor}} gracefully, but fails with an exception, 
because negative timestamps cannot get handled in a meaningful way for any time 
based (ie, window) operators like window aggregates and joins.

We want to change Stream to a auto-drop behavior for negative timestamps for 
those records (without any further user notification about dropped record) 
instead of an exception. To guard the user from silently dropping messages (and 
kept current fail-fast behavior), we change the default extractor 
{{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
timestamp and raise an exception if it is negative. Furthermore, we add a 
"drop-and-log" extractor, as this seems to be a common behavior user might want 
to have. For any other behavior, users can still provide a custom TS-Extractor 
implementation.


> Improve invalid/negative TS handling
> 
>
> Key: KAFKA-4393
> URL: https://issues.apache.org/jira/browse/KAFKA-4393
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.0
>
>
> Currently, Kafka Streams does not handle invalid/negative timestamps returned 
> from the {{TimestampExtractor}} gracefully, but fails with an exception, 
> because negative timestamps cannot get handled in a meaningful way for any 
> time based (ie, window) operators like window aggregates and joins.
> We want to change Streams to a auto-drop behavior for negative timestamps for 
> those records (without any further user notification about dropped record) to 
> enable users to "step over" those records and keep going (instead of an 
> exception). To guard the user from silently dropping messages by default (and 
> kept current fail-fast behavior), we change the default extractor 
> {{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
> timestamp and raise an exception if it is negative. Furthermore, we add a 
> "drop-and-log" extractor, as this seems to be a common behavior user might 
> want to have. For any other behavior, users can still provide a custom 
> TS-Extractor implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread radai
+1 for {namespace, plugin}, would definitely cut down on registration
overhead.

On Tue, Nov 8, 2016 at 5:54 PM, Gwen Shapira  wrote:

> Thank you so much for this clear and fair summary of the arguments.
>
> I'm in favor of ints. Not a deal-breaker, but in favor.
>
> Even more in favor of Magnus's decentralized suggestion with Roger's
> tweak: add a namespace for headers. This will allow each app to just
> use whatever IDs it wants internally, and then let the admin deploying
> the app figure out an available namespace ID for the app to live in.
> So io.confluent.schema-registry can be namespace 0x01 on my deployment
> and 0x57 on yours, and the poor guys developing the app don't need to
> worry about that.
>
> Gwen
>
> On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
> > +1 for sean's document. it covers pretty much all the trade-offs and
> > provides concrete figures to argue about :-)
> > (nit-picking - used the same xkcd twice, also trove has been superceded
> for
> > purposes of high performance collections: look at
> > https://github.com/leventov/Koloboke)
> >
> > so to sum up the string vs int debate:
> >
> > performance - you can do 140k ops/sec _per thread_ with string headers.
> you
> > could do x2-3 better with ints. there's no arguing the relative diff
> > between the two, there's only the question of whether or not _the rest of
> > kafka_ operates fast enough to care. if we want to make choices solely
> > based on performance we need ints. if we are willing to settle/compromise
> > for a nicer (to some) API than strings are good enough for the current
> > state of affairs.
> >
> > message size - with batching and compression it comes down to a ~5%
> > difference (internal testing, not in the doc. maybe would help adding if
> > this becomes a point of contention?). this means it wont really affect
> > kafka in "throughput mode" (large, compressed batches). in "low latency"
> > mode (meaning less/no batching and compression) the difference can be
> > extreme (it'll easily be an order of magnitude with small payloads like
> > stock ticks and header keys of the form
> > "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such
> topics at
> > linkedin where actual payloads are ~2 ints and are eclipsed by our
> in-house
> > audit "header" which is why we liked ints to begin with.
> >
> > "ease of use" - strings would probably still require _some_ degree of
> > partitioning by convention (imagine if everyone used the key "infra"...)
> > but its very intuitive for java devs to do anyway (reverse-domain is
> > ingrained into java developers at a young age :-) ). also most java devs
> > find Map more intuitive than Map -
> > probably because of other text-based protocols like http. ints would
> > require a number registry. if you think number registries are hard just
> > look at the wiki page for KIPs (specifically the number for next
> available
> > KIP) and think again - we are probably talking about the same volume of
> > requests. also this would only be "required" (good citizenship, more
> like)
> > if you want to publish your plugin for others to use. within your org do
> > whatever you want - just know that if you use [some "reserved" range]
> and a
> > future kafka update breaks it its your problem. RTFM.
> >
> > personally im in favor of ints.
> >
> > having said that (and like nacho) I will settle if int vs string remains
> > the only obstacle to this.
> >
> > On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis  >
> > wrote:
> >
> >> I think it's well known I've been pushing for ints (and I could switch
> to
> >> 16 bit shorts if pressed).
> >>
> >> - efficient (space)
> >> - efficient (processing)
> >> - easily partitionable
> >>
> >>
> >> However, if the only thing that is keeping us from adopting headers is
> the
> >> use of strings vs ints as keys, then I would cave in and accept
> strings. If
> >> we do so, I would like to limit string keys to 128 bytes in length.
> This
> >> way 1) I could use a 3 letter string if I wanted (effectively using 4
> total
> >> bytes), 2) limit overall impact of possible keys (don't really want
> people
> >> to send a 16K header string key).
> >>
> >> Nacho
> >>
> >>
> >> On Tue, Nov 8, 2016 at 3:35 PM, Gwen Shapira  wrote:
> >>
> >> > Forgot to mention: Thank you for quantifying the trade-off - it is
> >> > helpful and important regardless of what we end up deciding.
> >> >
> >> > On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
> >> >  wrote:
> >> > > On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira 
> >> wrote:
> >> > >
> >> > >> Since Kafka specifically targets high-throughput, low-latency
> >> > >> use-cases, I don't think we should trade them off that easily.
> >> > >>
> >> > >
> >> > > I find these kind of design goals not to be really helpful unless
> it's
> >> > > quantified 

[jira] [Commented] (KAFKA-4393) Improve invalid/negative TS handling

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649605#comment-15649605
 ] 

ASF GitHub Bot commented on KAFKA-4393:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2117

KAFKA-4393: Improve invalid/negative TS handling



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-4393-improveInvalidTsHandling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2117


commit 23e08d18585febdf34917e1d33f4c142ae36e748
Author: Matthias J. Sax 
Date:   2016-11-09T02:01:56Z

KAFKA-4393: Improve invalid/negative TS handling




> Improve invalid/negative TS handling
> 
>
> Key: KAFKA-4393
> URL: https://issues.apache.org/jira/browse/KAFKA-4393
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.0
>
>
> Currently, Kafka Streams does not handle invalid/negative timestamps returned 
> from the {{TimestampExtractor}} gracefully, but fails with an exception, 
> because negative timestamps cannot get handled in a meaningful way for any 
> time based (ie, window) operators like window aggregates and joins.
> We want to change Stream to a auto-drop behavior for negative timestamps for 
> those records (without any further user notification about dropped record) 
> instead of an exception. To guard the user from silently dropping messages 
> (and kept current fail-fast behavior), we change the default extractor 
> {{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
> timestamp and raise an exception if it is negative. Furthermore, we add a 
> "drop-and-log" extractor, as this seems to be a common behavior user might 
> want to have. For any other behavior, users can still provide a custom 
> TS-Extractor implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2117: KAFKA-4393: Improve invalid/negative TS handling

2016-11-08 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2117

KAFKA-4393: Improve invalid/negative TS handling



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-4393-improveInvalidTsHandling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2117


commit 23e08d18585febdf34917e1d33f4c142ae36e748
Author: Matthias J. Sax 
Date:   2016-11-09T02:01:56Z

KAFKA-4393: Improve invalid/negative TS handling




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4393) Improve invalid/negative TS handling

2016-11-08 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4393:
---
Description: 
Currently, Kafka Streams does not handle invalid/negative timestamps returned 
from the {{TimestampExtractor}} gracefully, but fails with an exception, 
because negative timestamps cannot get handled in a meaningful way for any time 
based (ie, window) operators like window aggregates and joins.

We want to change Stream to a auto-drop behavior for negative timestamps for 
those records (without any further user notification about dropped record) 
instead of an exception. To guard the user from silently dropping messages (and 
kept current fail-fast behavior), we change the default extractor 
{{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
timestamp and raise an exception if it is negative. Furthermore, we add a 
"drop-and-log" extractor, as this seems to be a common behavior user might want 
to have. For any other behavior, users can still provide a custom TS-Extractor 
implementation.

  was:
Currently, Kafka Streams does not handle invalid/negative timestamps returned 
from the {{TimestampExtractor}} gracefully, but fails with an exception. Input 
record timestamps determine output record timestamps and {{KafkaProducer}} does 
not allow negative timestamps to get written.

Besides the exception issues describe above, negative timestamp can also not 
get handled in a meaningful way for any time based (ie, window) operators like 
window aggregates and joins.

Thus, we want to change Stream to a auto-drop behavior for negative timestamps 
and not process those records at all (without any further user notification 
about dropped record). To guard the user from silently dropping messages, we 
change the default extractor {{ConsumerRecordTimestampExtractor}} to check the 
extracted meta-data record timestamp and raise an exception if it is negative. 
Furthermore, we add a "drop-and-log" extractor, as this seems to be a common 
behavior user might want to have. For any other behavior, users can still 
provide a custom TS-Extractor implementation.


> Improve invalid/negative TS handling
> 
>
> Key: KAFKA-4393
> URL: https://issues.apache.org/jira/browse/KAFKA-4393
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.0
>
>
> Currently, Kafka Streams does not handle invalid/negative timestamps returned 
> from the {{TimestampExtractor}} gracefully, but fails with an exception, 
> because negative timestamps cannot get handled in a meaningful way for any 
> time based (ie, window) operators like window aggregates and joins.
> We want to change Stream to a auto-drop behavior for negative timestamps for 
> those records (without any further user notification about dropped record) 
> instead of an exception. To guard the user from silently dropping messages 
> (and kept current fail-fast behavior), we change the default extractor 
> {{ConsumerRecordTimestampExtractor}} to check the extracted meta-data record 
> timestamp and raise an exception if it is negative. Furthermore, we add a 
> "drop-and-log" extractor, as this seems to be a common behavior user might 
> want to have. For any other behavior, users can still provide a custom 
> TS-Extractor implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
Thank you so much for this clear and fair summary of the arguments.

I'm in favor of ints. Not a deal-breaker, but in favor.

Even more in favor of Magnus's decentralized suggestion with Roger's
tweak: add a namespace for headers. This will allow each app to just
use whatever IDs it wants internally, and then let the admin deploying
the app figure out an available namespace ID for the app to live in.
So io.confluent.schema-registry can be namespace 0x01 on my deployment
and 0x57 on yours, and the poor guys developing the app don't need to
worry about that.

Gwen

On Tue, Nov 8, 2016 at 4:23 PM, radai  wrote:
> +1 for sean's document. it covers pretty much all the trade-offs and
> provides concrete figures to argue about :-)
> (nit-picking - used the same xkcd twice, also trove has been superceded for
> purposes of high performance collections: look at
> https://github.com/leventov/Koloboke)
>
> so to sum up the string vs int debate:
>
> performance - you can do 140k ops/sec _per thread_ with string headers. you
> could do x2-3 better with ints. there's no arguing the relative diff
> between the two, there's only the question of whether or not _the rest of
> kafka_ operates fast enough to care. if we want to make choices solely
> based on performance we need ints. if we are willing to settle/compromise
> for a nicer (to some) API than strings are good enough for the current
> state of affairs.
>
> message size - with batching and compression it comes down to a ~5%
> difference (internal testing, not in the doc. maybe would help adding if
> this becomes a point of contention?). this means it wont really affect
> kafka in "throughput mode" (large, compressed batches). in "low latency"
> mode (meaning less/no batching and compression) the difference can be
> extreme (it'll easily be an order of magnitude with small payloads like
> stock ticks and header keys of the form
> "com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such topics at
> linkedin where actual payloads are ~2 ints and are eclipsed by our in-house
> audit "header" which is why we liked ints to begin with.
>
> "ease of use" - strings would probably still require _some_ degree of
> partitioning by convention (imagine if everyone used the key "infra"...)
> but its very intuitive for java devs to do anyway (reverse-domain is
> ingrained into java developers at a young age :-) ). also most java devs
> find Map more intuitive than Map -
> probably because of other text-based protocols like http. ints would
> require a number registry. if you think number registries are hard just
> look at the wiki page for KIPs (specifically the number for next available
> KIP) and think again - we are probably talking about the same volume of
> requests. also this would only be "required" (good citizenship, more like)
> if you want to publish your plugin for others to use. within your org do
> whatever you want - just know that if you use [some "reserved" range] and a
> future kafka update breaks it its your problem. RTFM.
>
> personally im in favor of ints.
>
> having said that (and like nacho) I will settle if int vs string remains
> the only obstacle to this.
>
> On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis 
> wrote:
>
>> I think it's well known I've been pushing for ints (and I could switch to
>> 16 bit shorts if pressed).
>>
>> - efficient (space)
>> - efficient (processing)
>> - easily partitionable
>>
>>
>> However, if the only thing that is keeping us from adopting headers is the
>> use of strings vs ints as keys, then I would cave in and accept strings. If
>> we do so, I would like to limit string keys to 128 bytes in length.  This
>> way 1) I could use a 3 letter string if I wanted (effectively using 4 total
>> bytes), 2) limit overall impact of possible keys (don't really want people
>> to send a 16K header string key).
>>
>> Nacho
>>
>>
>> On Tue, Nov 8, 2016 at 3:35 PM, Gwen Shapira  wrote:
>>
>> > Forgot to mention: Thank you for quantifying the trade-off - it is
>> > helpful and important regardless of what we end up deciding.
>> >
>> > On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
>> >  wrote:
>> > > On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira 
>> wrote:
>> > >
>> > >> Since Kafka specifically targets high-throughput, low-latency
>> > >> use-cases, I don't think we should trade them off that easily.
>> > >>
>> > >
>> > > I find these kind of design goals not to be really helpful unless it's
>> > > quantified in someway.  Because it's always possible to argue against
>> > > something as either being not performant or just an implementation
>> > detail.
>> > >
>> > > This is a single threaded benchmarks so all the measurements are per
>> > > thread.
>> > >
>> > > For 1M messages/s/thread  if header keys are int and you had even a
>> > single
>> > > header key, value pair then 

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649493#comment-15649493
 ] 

ASF GitHub Bot commented on KAFKA-4362:
---

GitHub user MayureshGharat opened a pull request:

https://github.com/apache/kafka/pull/2116

KAFKA-4362 : Consumer can fail after reassignment of the offsets topic 
partition



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayureshGharat/kafka KAFKA-4362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2116


commit ffc998ec831250fcda6b988398ed486f3ea9a99d
Author: MayureshGharat 
Date:   2016-11-09T01:43:46Z

Fix : Consumer can fail after reassignment of the offsets topic partition




> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2116: KAFKA-4362 : Consumer can fail after reassignment ...

2016-11-08 Thread MayureshGharat
GitHub user MayureshGharat opened a pull request:

https://github.com/apache/kafka/pull/2116

KAFKA-4362 : Consumer can fail after reassignment of the offsets topic 
partition



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MayureshGharat/kafka KAFKA-4362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2116


commit ffc998ec831250fcda6b988398ed486f3ea9a99d
Author: MayureshGharat 
Date:   2016-11-09T01:43:46Z

Fix : Consumer can fail after reassignment of the offsets topic partition




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Roger Hoover
Sorry I didn't read the KIP carefully enough and thought there was more
difference between 5a and 5c.  I now see that in 5a, the headers section is
already defined as a sub-protocol that (I assume) does not need to be
parsed at the broker.

The main difference, as pointed out, is whether the broker would ever
want/need to act on non-standard headers.  In any case, if there's a good
default serializer, there should be minimal need for custom serializers.
It's a fairly minor point.

The main decision on headers, it seems, is how to handle namespacing them.
With strings, each header can carry it's own namespace it's own via prefix
(e.g. grpc-* for gRPC on top of HTTP2).

For ints, the KIP suggests a global registry for headers and Magnus
proposed a decentralized pluggable mapping.  In either case, I'd suggest a
tweak to the model.  I think it would help to identify a header by
(namespaceId, fieldId) similar to the Protobuf model.  If there's appetite
for a global registry, organizations would only have to register a single
namespace and from then on add headers at will.  For the pluggable mapping,
users would only have to providing a mapping for each namespace they use,
rather than a mapping for each specific header.  It's up to the
apps/plugins within a namespace to maintain their own list of field ids (as
with a protobuf definition).

My main concern (similar to Nacho) is to see a common client API that
allows per message headers.  I'll let others debate the performance
tradeoffs of strings vs. ints.

Roger

On Tue, Nov 8, 2016 at 10:54 AM, Nacho Solis 
wrote:

> From Roger's description:
>
> 5a- separate metadata field, built in serialization
> 5c- separate metadata field, custom serialization
> 5b- custom serialization (inside V)
> 5d- built in serialization (inside V)
>
> I added 5d for completeness.
>
> From this perspective I would choose
>
> 5a > 5c > 5d > 5b
>
> In all of these cases, I would like to make sure that the broker
> (eventually) has the ability to deal with the headers.
>
> - Custom serialization
> I'm not in favor of custom serialization for the whole metadata/header
> block. This is because I think that we will have multiple headers or
> metadata blobs by different teams (either internal or external), with
> different goals and different requirements. They will work independently of
> each other. They will not try to coordinate a common format. The audit team
> and the security team and the performance team and the monitoring team and
> the application might not work of the same needs.  The one need that they
> share is sending data along with the message.   To put all of their data
> together, we will need a common header system.
>
> Obviously the kafka team (the team in charge of running the kafka system,
> say, the linkedin-kafka-team) can write a wrapper or a custom serializer
> that somehow provides a set of functions for all these teams to work
> together.  So technically we're not limited. However, if we want to share
> our plugins with some other team, the acme-kafka-team then we would have to
> have compatible serializers.  This is doable but not an easy task.
>
> From my point of view we have 2 options:
> A- We use a built in serializer for the headers. Each plugin/module can
> then serialize their internal data however they want, but the set format
> itself is common.  This would allow us to work together. Plugins are shared
> and evolve from collective efforts.
> B1- We use a custom serializer for headers.  We have balkanization of
> headers and no cooperation
> B2- We use a custom serializer for headers.  One such serializer becomes
> popular, effectively providing a wrapper to the open source clients that
> provides header support. Various companies/entities start using this and
> form a community around this. Plugins are shared and evolve from collective
> efforts.
>
> I believe that given B2 offers collective power, it will overtake B1.
> Effectively, we would reach the same situation as A, but will take a little
> more time and will make the code more difficult to manage.
>
>
> Isn't this the same reason Connect is inside Apache Kafka?  And now there
> are a set of Kafka Connectors (https://www.confluent.io/prod
> uct/connectors/)
> that take advantage of the fact that Connect defines a common framework.
>
>
> To be clear, I think my main goal would be for Apache Kafka to offer a
> Client API to add and remove headers per message.  If we can offer this as
> a standard (in other words, part of Apache Kafka open source), then we have
> achieved 80% of the work. The community will benefit as a whole.   If this
> is done via a container inside V, if it's done natively in the protocol, if
> we offer a way to override the serializer, if the broker can understand the
> headers, etc. are secondary (though I obviously have opinions about those;
> no, yes, no, yes).
>
> If we don't want to include this into Apache open source, then the people
> that 

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-08 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649485#comment-15649485
 ] 

Mayuresh Gharat commented on KAFKA-4362:


I did some more testing while reproducing these error scenarios.
There are 2 bugs :
1) OFFSET COMMITS :
 When we check for MessageFormatVersion, we actually first check if the replica 
is local for the __consumer_offsets topic and if its not, we return an 
illegalArgumentException. This results in an Unknown Exception on the client 
side and commitOffset operation gets an exception. 
Also as a side effect of this, if you start another consumer in the same group 
for consuming from the same topic after the rebalance is done and start 
producing to the topic, you will see that both consumers are consuming the same 
data. This is because the second consumer that you have started is talking to 
the right coordinator and the first consumer is completely unaware of the 
presence of second consumer.

2) CONSUMER REBALANCE :
While doing topic reassignment for example moving replicas from (1,2,3 [Leader 
: 1]) to (4,2,3 [Leader : 4]) for __consumer_offsets topic, controller sends 
stopReplicaRequest to broker 1 for __consumer_offsets topic. While handling 
this request on server side, we never get rid of the particular partition of 
__consumer_offsets topic from the coordinators (broker 1) cache. When a 
handleJoinGroupRequest comes in during rebalance, the coordinator (broker 1) 
actually has a check if the group is local. But since we have not removed the 
group from its cache on the earlier stopRepicaRequest from the controller, it 
does not return NotCoordinatorForGroupException but proceeds with success. So 
the consumer thinks that its talking to the right coordinator (which is not the 
case since we moved the coordinator to broker 4 from broker 1). On the consumer 
side, in the handleJoinGroupResponseHandler callback, we send SyncGroupRequest 
to broker 1, which in turn calls the code for checking the MessageFormatVersion 
on the server. At this point it throws an illegalArgumentException for same 
reason expalined in point 1) above. This causes the syncGroupRequest to fail 
with unknown exception in the SyncGroupResponseHandler callback.

The exact steps for reproducing these scenarios are as follows :
For 1)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer side, something like this 
:

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you still produce to the topic, you should be able to see data in the 
console of this consumer, but its not able to commit offsets.
i) Now if you start another console consumer with same groupId = testGroupA 
consuming from the same topic testtopicA and if you produce more data, you 
should be able to see the data in both the consumer consoles.


For 2)
a) Start 4 kafka brokers and create a topic testtopicA with 1 partition.
b) Start a producer producing to a topic testtopicA. 
c) Start a console consumer with a groupId = testGroupA consuming from 
testtopicA.
c) Start another console consumer with a groupId = testGroupA consuming from 
testtopicA.
d) Produce and consume some data. Exactly one of them should be consuming the 
data.
e) Find the __consumer_offsets partition that stores the offsets for testGroupA.
f) Reassign the partitions for the partition form e) such that you remove the 
leader out of replica lists.
g) You should see frequent exceptions on the consumer actually fetching data, 
something like this :

[2016-11-08 10:04:03,192] ERROR Group testGroupA failed to commit partition 
testtopicA-0 at offset 14: The server experienced an unexpected error when 
processing the request 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

h) If you kill this consumer, you should immediately see an exception on the 
other consumer console, something like this : 

[2016-11-08 10:04:20,705] ERROR Error processing message, terminating consumer 
process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
server experienced an unexpected error when processing the request
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:518)
at 

Re: [ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-11-08 Thread radai
congratulations!

 )  (  )  (
(^)(^)(^)(^)
_i__i__i__i_
   ()
   ||>o<|###|
   ()



On Mon, Nov 7, 2016 at 6:03 AM, Michael Noll  wrote:

> Congratulations, Becket!
>
> Best wishes,
> Michael
>
> On Thu, Nov 3, 2016 at 5:13 PM, Efe Gencer  wrote:
>
> > Congratulations, Becket!
> >
> > Best,
> > Efe
> >
> > 2016-11-03 11:22 GMT-04:00 Neha Narkhede :
> >
> > > Congratulations, Becket! Well done.
> > >
> > > On Wed, Nov 2, 2016 at 1:34 PM Eno Thereska 
> > > wrote:
> > >
> > > > Congrats!
> > > > Eno
> > > >
> > > > > On 1 Nov 2016, at 05:57, Harsha Chintalapani 
> > wrote:
> > > > >
> > > > > Congrats Becket!
> > > > > -Harsha
> > > > >
> > > > > On Mon, Oct 31, 2016 at 2:13 PM Rajini Sivaram <
> > > > rajinisiva...@googlemail.com>
> > > > > wrote:
> > > > >
> > > > >> Congratulations, Becket!
> > > > >>
> > > > >> On Mon, Oct 31, 2016 at 8:38 PM, Matthias J. Sax <
> > > matth...@confluent.io
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> -BEGIN PGP SIGNED MESSAGE-
> > > > >>> Hash: SHA512
> > > > >>>
> > > > >>> Congrats!
> > > > >>>
> > > > >>> On 10/31/16 11:01 AM, Renu Tewari wrote:
> > > >  Congratulations Becket!! Absolutely thrilled to hear this. Well
> > > >  deserved!
> > > > 
> > > >  regards renu
> > > > 
> > > > 
> > > >  On Mon, Oct 31, 2016 at 10:35 AM, Joel Koshy <
> jjkosh...@gmail.com
> > >
> > > >  wrote:
> > > > 
> > > > > The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to
> > > > > join as a committer and we are pleased to announce that he has
> > > > > accepted!
> > > > >
> > > > > Becket has made significant contributions to Kafka over the
> last
> > > > > two years. He has been deeply involved in a broad range of KIP
> > > > > discussions and has contributed several major features to the
> > > > > project. He recently completed the implementation of a series
> of
> > > > > improvements (KIP-31, KIP-32, KIP-33) to Kafka’s message format
> > > > > that address a number of long-standing issues such as avoiding
> > > > > server-side re-compression, better accuracy for time-based log
> > > > > retention, log roll and time-based indexing of messages.
> > > > >
> > > > > Congratulations Becket! Thank you for your many contributions.
> We
> > > > > are excited to have you on board as a committer and look
> forward
> > > > > to your continued participation!
> > > > >
> > > > > Joel
> > > > >
> > > > 
> > > > >>> -BEGIN PGP SIGNATURE-
> > > > >>> Comment: GPGTools - https://gpgtools.org
> > > > >>>
> > > > >>> iQIcBAEBCgAGBQJYF6uzAAoJECnhiMLycopPBuwP/1N2MtwWw7ms5gAfT/jvVCGi
> > > > >>> mdNvdJprSwJHe3qwsc+glsvAqwS6OZfaVzK2qQcaxMX5KjQtwkkOKyErOl9hG7jD
> > > > >>> Vw0aDcCbPuV2oEZ4m9K2J4Q3mZIfFrevicVb7oPGf4Yjt1sh9wxP08o7KHP2l5pN
> > > > >>> 3mpIBEDp4rZ2pg/jXldyh57dW1btg3gZi1gNczWvXEAKf1ypXRPwPeDbvXADXDv3
> > > > >>> 0NgmcXn242geoggnIbL30WgjH0bwHpVjLBr++YQ33FzRoHzASfAYHR/jSDKAytQe
> > > > >>> a7Bkc69Bb1NSzkfhiJa+VW9V2DweO8kD+Xfz4dM02GQF0iJkAqare7a6zWedk/+U
> > > > >>> hJRPz+tGlDSLePCYdyNj1ivJrFOmIQtyFOI3SBANfaneOmGJhPKtlNQQlNFKDbWS
> > > > >>> CD1pBsc1iHNq6rXy21evc/aFk0Rrfs5d4rU9eG6jD8jc1mCbSwtzJI0vweX0r9Y/
> > > > >>> 6Ao8cnsmDejYfap5lUMWeQfZOTkNRNpbkL7eoiVpe6wZw1nGL3T7GkrrWGRS3EQO
> > > > >>> qp4Jjp+7yY4gIqsLfYouaHTEzAX7yN78QNUNCB4OqUiEL9+a8wTQ7dlTgXinEd8r
> > > > >>> Kh9vTfpW7fb4c58aSpzntPUU4YFD3MHMam0iu5UrV9d5DrVTFDMJ83k15Z5DyTMt
> > > > >>> 45nPYdjvJgFGWLYFnPwr
> > > > >>> =VbpG
> > > > >>> -END PGP SIGNATURE-
> > > > >>>
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Regards,
> > > > >>
> > > > >> Rajini
> > > > >>
> > > >
> > > > --
> > > Thanks,
> > > Neha
> > >
> >
>


Jenkins build is back to normal : kafka-trunk-jdk7 #1678

2016-11-08 Thread Apache Jenkins Server
See 



Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-08 Thread Gwen Shapira
+1 (binding)

On Tue, Nov 8, 2016 at 10:26 AM, radai  wrote:
> I've updated the KIP page to specify the new config would co-exist with
> "queued.max.request" to minimize the impact on compatibility.
>
> On Tue, Nov 8, 2016 at 7:02 AM, radai  wrote:
>
>> My personal opinion on this is that control of memory was always the
>> intent behind queued.max.requests and so this KIP could completely obsolete
>> it.
>> For now its probably safest to leave it as-is (making memory-bound
>> "opt-in") and revisit this at a later date
>>
>> On Mon, Nov 7, 2016 at 2:32 PM, Gwen Shapira  wrote:
>>
>>> Hey Radai,
>>>
>>> Looking at the proposal, it looks like a major question is still
>>> unresolved?
>>> "This configuration parameter can either replace queued.max.requests
>>> completely, or co-exist with it (by way of either-or or respecting
>>> both bounds and not picking up new requests when either is hit)."
>>>
>>> On Mon, Nov 7, 2016 at 1:08 PM, radai  wrote:
>>> > Hi,
>>> >
>>> > I would like to initiate a vote on KIP-72:
>>> >
>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+
>>> Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>>> >
>>> > The kip allows specifying a limit on the amount of memory allocated for
>>> > reading incoming requests into. This is useful for "sizing" a broker and
>>> > avoiding OOMEs under heavy load (as actually happens occasionally at
>>> > linkedin).
>>> >
>>> > I believe I've addressed most (all?) concerns brought up during the
>>> > discussion.
>>> >
>>> > To the best of my understanding this vote is about the goal and
>>> > public-facing changes related to the new proposed behavior, but as for
>>> > implementation, i have the code up here:
>>> >
>>> > https://github.com/radai-rosenblatt/kafka/tree/broker-memory
>>> -pool-with-muting
>>> >
>>> > and I've stress-tested it to work properly (meaning it chugs along and
>>> > throttles under loads that would DOS 10.0.1.0 code).
>>> >
>>> > I also believe that the primitives and "pattern"s introduced in this KIP
>>> > (namely the notion of a buffer pool and retrieving from / releasing to
>>> said
>>> > pool instead of allocating memory) are generally useful beyond the
>>> scope of
>>> > this KIP for both performance issues (allocating lots of short-lived
>>> large
>>> > buffers is a performance bottleneck) and other areas where memory limits
>>> > are a problem (KIP-81)
>>> >
>>> > Thank you,
>>> >
>>> > Radai.
>>>
>>>
>>>
>>> --
>>> Gwen Shapira
>>> Product Manager | Confluent
>>> 650.450.2760 | @gwenshap
>>> Follow us: Twitter | blog
>>>
>>
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread radai
+1 for sean's document. it covers pretty much all the trade-offs and
provides concrete figures to argue about :-)
(nit-picking - used the same xkcd twice, also trove has been superceded for
purposes of high performance collections: look at
https://github.com/leventov/Koloboke)

so to sum up the string vs int debate:

performance - you can do 140k ops/sec _per thread_ with string headers. you
could do x2-3 better with ints. there's no arguing the relative diff
between the two, there's only the question of whether or not _the rest of
kafka_ operates fast enough to care. if we want to make choices solely
based on performance we need ints. if we are willing to settle/compromise
for a nicer (to some) API than strings are good enough for the current
state of affairs.

message size - with batching and compression it comes down to a ~5%
difference (internal testing, not in the doc. maybe would help adding if
this becomes a point of contention?). this means it wont really affect
kafka in "throughput mode" (large, compressed batches). in "low latency"
mode (meaning less/no batching and compression) the difference can be
extreme (it'll easily be an order of magnitude with small payloads like
stock ticks and header keys of the form
"com.acme.infraTeam.kafka.hiMom.auditPlugin"). we have a few such topics at
linkedin where actual payloads are ~2 ints and are eclipsed by our in-house
audit "header" which is why we liked ints to begin with.

"ease of use" - strings would probably still require _some_ degree of
partitioning by convention (imagine if everyone used the key "infra"...)
but its very intuitive for java devs to do anyway (reverse-domain is
ingrained into java developers at a young age :-) ). also most java devs
find Map more intuitive than Map -
probably because of other text-based protocols like http. ints would
require a number registry. if you think number registries are hard just
look at the wiki page for KIPs (specifically the number for next available
KIP) and think again - we are probably talking about the same volume of
requests. also this would only be "required" (good citizenship, more like)
if you want to publish your plugin for others to use. within your org do
whatever you want - just know that if you use [some "reserved" range] and a
future kafka update breaks it its your problem. RTFM.

personally im in favor of ints.

having said that (and like nacho) I will settle if int vs string remains
the only obstacle to this.

On Tue, Nov 8, 2016 at 3:53 PM, Nacho Solis 
wrote:

> I think it's well known I've been pushing for ints (and I could switch to
> 16 bit shorts if pressed).
>
> - efficient (space)
> - efficient (processing)
> - easily partitionable
>
>
> However, if the only thing that is keeping us from adopting headers is the
> use of strings vs ints as keys, then I would cave in and accept strings. If
> we do so, I would like to limit string keys to 128 bytes in length.  This
> way 1) I could use a 3 letter string if I wanted (effectively using 4 total
> bytes), 2) limit overall impact of possible keys (don't really want people
> to send a 16K header string key).
>
> Nacho
>
>
> On Tue, Nov 8, 2016 at 3:35 PM, Gwen Shapira  wrote:
>
> > Forgot to mention: Thank you for quantifying the trade-off - it is
> > helpful and important regardless of what we end up deciding.
> >
> > On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
> >  wrote:
> > > On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira 
> wrote:
> > >
> > >> Since Kafka specifically targets high-throughput, low-latency
> > >> use-cases, I don't think we should trade them off that easily.
> > >>
> > >
> > > I find these kind of design goals not to be really helpful unless it's
> > > quantified in someway.  Because it's always possible to argue against
> > > something as either being not performant or just an implementation
> > detail.
> > >
> > > This is a single threaded benchmarks so all the measurements are per
> > > thread.
> > >
> > > For 1M messages/s/thread  if header keys are int and you had even a
> > single
> > > header key, value pair then it's still about 2^-2 microseconds which
> > means
> > > you only have another 0.75 microseconds to do everything else you want
> to
> > > do with a message (1M messages/s means 1 micro second per message).
> With
> > > string header keys there is still 0.5 micro seconds to process a
> message.
> > >
> > >
> > >
> > > I love strings as much as the next guy (we had them in Flume), but I
> > >> was convinced by Magnus/Michael/Radai that strings don't actually have
> > >> strong benefits as opposed to ints (you'll need a string registry
> > >> anyway - otherwise, how will you know what does the "profile_id"
> > >> header refers to?) and I want to keep closer to our original design
> > >> goals for Kafka.
> > >>
> > >
> > > "confluent.profile_id"
> > >
> > >
> > >>
> > >> 

[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-08 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649280#comment-15649280
 ] 

Ewen Cheslack-Postava commented on KAFKA-4381:
--

Yeah, I don't want things to get bogged down in KIPs either. I just figure 
following the rule of "any public interface" makes it easy for everyone to know 
whether they need a KIP or not (e.g. monitoring is listed on the KIP page since 
they are user facing). We definitely play fast and loose with this elsewhere 
anyway though, e.g. command line tools often see user facing changes w/o KIPs, 
and most of them will be uncontroversial anyway. On the other hand, I've caught 
important backwards incompatible changes that almost made it through as well, 
so...

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3901) KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards null values

2016-11-08 Thread Dmitry Minkovsky (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649270#comment-15649270
 ] 

Dmitry Minkovsky edited comment on KAFKA-3901 at 11/9/16 12:08 AM:
---

Also affects 0.10.1.0. How about a PR?


was (Author: dminkovsky):
Also affects 0.10.1.0 <3

> KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards 
> null values
> -
>
> Key: KAFKA-3901
> URL: https://issues.apache.org/jira/browse/KAFKA-3901
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
>
> Did this get missed in [KAFKA-3519: Refactor Transformer's transform / 
> punctuate to return nullable 
> value|https://github.com/apache/kafka/commit/40fd456649b5df29d030da46865b5e7e0ca6db15#diff-338c230fd5a15d98550230007651a224]?
>  I think it may have, because that processor's #punctuate() does not forward 
> null. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3901) KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards null values

2016-11-08 Thread Dmitry Minkovsky (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649270#comment-15649270
 ] 

Dmitry Minkovsky commented on KAFKA-3901:
-

Also affects 0.10.1.0 <3

> KStreamTransformValues$KStreamTransformValuesProcessor#process() forwards 
> null values
> -
>
> Key: KAFKA-3901
> URL: https://issues.apache.org/jira/browse/KAFKA-3901
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
>
> Did this get missed in [KAFKA-3519: Refactor Transformer's transform / 
> punctuate to return nullable 
> value|https://github.com/apache/kafka/commit/40fd456649b5df29d030da46865b5e7e0ca6db15#diff-338c230fd5a15d98550230007651a224]?
>  I think it may have, because that processor's #punctuate() does not forward 
> null. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #1028

2016-11-08 Thread Apache Jenkins Server
See 

Changes:

[me] MINOR: some trace logging for streams debugging

--
[...truncated 3875 lines...]

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow 
STARTED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooLow PASSED

kafka.integration.AutoOffsetResetTest > 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Nacho Solis
I think it's well known I've been pushing for ints (and I could switch to
16 bit shorts if pressed).

- efficient (space)
- efficient (processing)
- easily partitionable


However, if the only thing that is keeping us from adopting headers is the
use of strings vs ints as keys, then I would cave in and accept strings. If
we do so, I would like to limit string keys to 128 bytes in length.  This
way 1) I could use a 3 letter string if I wanted (effectively using 4 total
bytes), 2) limit overall impact of possible keys (don't really want people
to send a 16K header string key).

Nacho


On Tue, Nov 8, 2016 at 3:35 PM, Gwen Shapira  wrote:

> Forgot to mention: Thank you for quantifying the trade-off - it is
> helpful and important regardless of what we end up deciding.
>
> On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
>  wrote:
> > On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira  wrote:
> >
> >> Since Kafka specifically targets high-throughput, low-latency
> >> use-cases, I don't think we should trade them off that easily.
> >>
> >
> > I find these kind of design goals not to be really helpful unless it's
> > quantified in someway.  Because it's always possible to argue against
> > something as either being not performant or just an implementation
> detail.
> >
> > This is a single threaded benchmarks so all the measurements are per
> > thread.
> >
> > For 1M messages/s/thread  if header keys are int and you had even a
> single
> > header key, value pair then it's still about 2^-2 microseconds which
> means
> > you only have another 0.75 microseconds to do everything else you want to
> > do with a message (1M messages/s means 1 micro second per message).  With
> > string header keys there is still 0.5 micro seconds to process a message.
> >
> >
> >
> > I love strings as much as the next guy (we had them in Flume), but I
> >> was convinced by Magnus/Michael/Radai that strings don't actually have
> >> strong benefits as opposed to ints (you'll need a string registry
> >> anyway - otherwise, how will you know what does the "profile_id"
> >> header refers to?) and I want to keep closer to our original design
> >> goals for Kafka.
> >>
> >
> > "confluent.profile_id"
> >
> >
> >>
> >> If someone likes strings in the headers and doesn't do millions of
> >> messages a sec, they probably have lots of other systems they can use
> >> instead.
> >>
> >
> > None of them will scale like Kafka.  Horizontal scaling is still good.
> >
> >
> >>
> >>
> >> On Tue, Nov 8, 2016 at 1:22 PM, Sean McCauliff
> >>  wrote:
> >> > +1 for String keys.
> >> >
> >> > I've been doing some bechmarking and it seems like the speedup for
> using
> >> > integer keys is about 2-5 depending on the length of the strings and
> what
> >> > collections are being used.  The overall amount of time spent parsing
> a
> >> set
> >> > of header key, value pairs probably does not matter unless you are
> >> getting
> >> > close to 1M messages per consumer.  In which case probably don't use
> >> > headers.  There is also the option to use very short strings; some
> that
> >> are
> >> > even shorter than integers.
> >> >
> >> > Partitioning the string key space will be easier than partitioning an
> >> > integer key space. We won't need a global registry.  Kafka internally
> can
> >> > reserve some prefix like "_" as its namespace.  Everyone else can use
> >> their
> >> > company or project name as namespace prefix and life should be good.
> >> >
> >> > Here's the link to some of the benchmarking info:
> >> > https://docs.google.com/document/d/1tfT-
> 6SZdnKOLyWGDH82kS30PnUkmgb7nPL
> >> dw6p65pAI/edit?usp=sharing
> >> >
> >> >
> >> >
> >> > --
> >> > Sean McCauliff
> >> > Staff Software Engineer
> >> > Kafka
> >> >
> >> > smccaul...@linkedin.com
> >> > linkedin.com/in/sean-mccauliff-b563192
> >> >
> >> > On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce <
> michael.pea...@ig.com>
> >> > wrote:
> >> >
> >> >> +1 on this slimmer version of our proposal
> >> >>
> >> >> I def think the Id space we can reduce from the proposed
> int32(4bytes)
> >> >> down to int16(2bytes) it saves on space and as headers we wouldn't
> >> expect
> >> >> the number of headers being used concurrently being that high.
> >> >>
> >> >> I would wonder if we should make the value byte array length still
> int32
> >> >> though as This is the standard Max array length in Java saying that
> it
> >> is a
> >> >> header and I guess limiting the size is sensible and would work for
> all
> >> the
> >> >> use cases we have in mind so happy with limiting this.
> >> >>
> >> >> Do people generally concur on Magnus's slimmer version? Anyone see
> any
> >> >> issues if we moved from int32 to int16?
> >> >>
> >> >> Re configurable ids per plugin over a global registry also would work
> >> for
> >> >> us.  As such if this has better concensus over the proposed global
> >> registry
> >> >> I'd be happy to 

[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649233#comment-15649233
 ] 

Guozhang Wang commented on KAFKA-4381:
--

To me, KIP is designed to solicit broader feedbacks from the community as well 
as enforce proposers to think carefully about pros / cons of different 
approaches as well as compatibility stories for certain changes, so changes 
like this sounds like overkill for KIP. But when talking to other committers a 
while ago I realized people have different opinions about this, so I'm OK to 
just play the restrict rule on book for such things as well, unless over time 
people it is hindering development pace and we can re-visit.

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-08 Thread Ryan P (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan P updated KAFKA-4364:
--
Comment: was deleted

(was: https://github.com/apache/kafka/pull/2115)

> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-08 Thread Ryan P (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649205#comment-15649205
 ] 

Ryan P commented on KAFKA-4364:
---

https://github.com/apache/kafka/pull/2115

> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649203#comment-15649203
 ] 

ASF GitHub Bot commented on KAFKA-4364:
---

GitHub user rnpridgeon opened a pull request:

https://github.com/apache/kafka/pull/2115

KAFKA-4364: Remove secrets from DEBUG logging

leverage fix from KAFKA-2690 to remove secrets from task logging 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rnpridgeon/kafka KAFKA-4364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2115


commit fe1f1f2cca6cd1b3255ef89eac2d6062a17e7079
Author: rnpridgeon 
Date:   2016-11-08T23:35:09Z

KAFKA-4364: Remove secrets from DEBUG logging




> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2115: KAFKA-4364: Remove secrets from DEBUG logging

2016-11-08 Thread rnpridgeon
GitHub user rnpridgeon opened a pull request:

https://github.com/apache/kafka/pull/2115

KAFKA-4364: Remove secrets from DEBUG logging

leverage fix from KAFKA-2690 to remove secrets from task logging 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rnpridgeon/kafka KAFKA-4364

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2115


commit fe1f1f2cca6cd1b3255ef89eac2d6062a17e7079
Author: rnpridgeon 
Date:   2016-11-08T23:35:09Z

KAFKA-4364: Remove secrets from DEBUG logging




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4393) Improve invalid/negative TS handling

2016-11-08 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4393:
--

 Summary: Improve invalid/negative TS handling
 Key: KAFKA-4393
 URL: https://issues.apache.org/jira/browse/KAFKA-4393
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax
 Fix For: 0.10.2.0


Currently, Kafka Streams does not handle invalid/negative timestamps returned 
from the {{TimestampExtractor}} gracefully, but fails with an exception. Input 
record timestamps determine output record timestamps and {{KafkaProducer}} does 
not allow negative timestamps to get written.

Besides the exception issues describe above, negative timestamp can also not 
get handled in a meaningful way for any time based (ie, window) operators like 
window aggregates and joins.

Thus, we want to change Stream to a auto-drop behavior for negative timestamps 
and not process those records at all (without any further user notification 
about dropped record). To guard the user from silently dropping messages, we 
change the default extractor {{ConsumerRecordTimestampExtractor}} to check the 
extracted meta-data record timestamp and raise an exception if it is negative. 
Furthermore, we add a "drop-and-log" extractor, as this seems to be a common 
behavior user might want to have. For any other behavior, users can still 
provide a custom TS-Extractor implementation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
Forgot to mention: Thank you for quantifying the trade-off - it is
helpful and important regardless of what we end up deciding.

On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
 wrote:
> On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira  wrote:
>
>> Since Kafka specifically targets high-throughput, low-latency
>> use-cases, I don't think we should trade them off that easily.
>>
>
> I find these kind of design goals not to be really helpful unless it's
> quantified in someway.  Because it's always possible to argue against
> something as either being not performant or just an implementation detail.
>
> This is a single threaded benchmarks so all the measurements are per
> thread.
>
> For 1M messages/s/thread  if header keys are int and you had even a single
> header key, value pair then it's still about 2^-2 microseconds which means
> you only have another 0.75 microseconds to do everything else you want to
> do with a message (1M messages/s means 1 micro second per message).  With
> string header keys there is still 0.5 micro seconds to process a message.
>
>
>
> I love strings as much as the next guy (we had them in Flume), but I
>> was convinced by Magnus/Michael/Radai that strings don't actually have
>> strong benefits as opposed to ints (you'll need a string registry
>> anyway - otherwise, how will you know what does the "profile_id"
>> header refers to?) and I want to keep closer to our original design
>> goals for Kafka.
>>
>
> "confluent.profile_id"
>
>
>>
>> If someone likes strings in the headers and doesn't do millions of
>> messages a sec, they probably have lots of other systems they can use
>> instead.
>>
>
> None of them will scale like Kafka.  Horizontal scaling is still good.
>
>
>>
>>
>> On Tue, Nov 8, 2016 at 1:22 PM, Sean McCauliff
>>  wrote:
>> > +1 for String keys.
>> >
>> > I've been doing some bechmarking and it seems like the speedup for using
>> > integer keys is about 2-5 depending on the length of the strings and what
>> > collections are being used.  The overall amount of time spent parsing a
>> set
>> > of header key, value pairs probably does not matter unless you are
>> getting
>> > close to 1M messages per consumer.  In which case probably don't use
>> > headers.  There is also the option to use very short strings; some that
>> are
>> > even shorter than integers.
>> >
>> > Partitioning the string key space will be easier than partitioning an
>> > integer key space. We won't need a global registry.  Kafka internally can
>> > reserve some prefix like "_" as its namespace.  Everyone else can use
>> their
>> > company or project name as namespace prefix and life should be good.
>> >
>> > Here's the link to some of the benchmarking info:
>> > https://docs.google.com/document/d/1tfT-6SZdnKOLyWGDH82kS30PnUkmgb7nPL
>> dw6p65pAI/edit?usp=sharing
>> >
>> >
>> >
>> > --
>> > Sean McCauliff
>> > Staff Software Engineer
>> > Kafka
>> >
>> > smccaul...@linkedin.com
>> > linkedin.com/in/sean-mccauliff-b563192
>> >
>> > On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
>> > wrote:
>> >
>> >> +1 on this slimmer version of our proposal
>> >>
>> >> I def think the Id space we can reduce from the proposed int32(4bytes)
>> >> down to int16(2bytes) it saves on space and as headers we wouldn't
>> expect
>> >> the number of headers being used concurrently being that high.
>> >>
>> >> I would wonder if we should make the value byte array length still int32
>> >> though as This is the standard Max array length in Java saying that it
>> is a
>> >> header and I guess limiting the size is sensible and would work for all
>> the
>> >> use cases we have in mind so happy with limiting this.
>> >>
>> >> Do people generally concur on Magnus's slimmer version? Anyone see any
>> >> issues if we moved from int32 to int16?
>> >>
>> >> Re configurable ids per plugin over a global registry also would work
>> for
>> >> us.  As such if this has better concensus over the proposed global
>> registry
>> >> I'd be happy to change that.
>> >>
>> >> I was already sold on ints over strings for keys ;)
>> >>
>> >> Cheers
>> >> Mike
>> >>
>> >> 
>> >> From: Magnus Edenhill 
>> >> Sent: Monday, November 7, 2016 10:10:21 PM
>> >> To: dev@kafka.apache.org
>> >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>> >>
>> >> Hi,
>> >>
>> >> I'm +1 for adding generic message headers, but I do share the concerns
>> >> previously aired on this thread and during the KIP meeting.
>> >>
>> >> So let me propose a slimmer alternative that does not require any sort
>> of
>> >> global header registry, does not affect broker performance or
>> operations,
>> >> and adds as little overhead as possible.
>> >>
>> >>
>> >> Message
>> >> 
>> >> The protocol Message type is extended with a Headers array consting of
>> >> Tags, where a Tag is defined as:
>> >>int16 Id
>> >> 

[jira] [Comment Edited] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-08 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649179#comment-15649179
 ] 

huxi edited comment on KAFKA-4391 at 11/8/16 11:31 PM:
---

It might be due to the fact that network and disk will be deactivated during 
the pc hibernating by default. Could you have a check?


was (Author: huxi_2b):
It might because of the fact that network and disk will be deactivated during 
the pc hibernating by default. Could you have a check?

> On Windows, Kafka server stops with uncaught exception after coming back from 
> sleep
> ---
>
> Key: KAFKA-4391
> URL: https://issues.apache.org/jira/browse/KAFKA-4391
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Windows 10, jdk1.8.0_111
>Reporter: Yiquan Zhou
>
> Steps to reproduce:
> 1. start the zookeeper
> $ bin\windows\zookeeper-server-start.bat config/zookeeper.properties
> 2. start the Kafka server with the default properties
> $ bin\windows\kafka-server-start.bat config/server.properties
> 3. put Windows into sleep mode for 1-2 hours
> 4. activate Windows again, an exception occurs in Kafka server console and 
> the server is stopped:
> {code:title=kafka console log}
> [2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
> server in 10081379ms for sessionid 0x1584514da47, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,029] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,044] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1584514da47 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,236] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
> [2016-11-08 21:45:43,280] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> log4j:ERROR Failed to rename [/controller.log] to 
> [/controller.log.2016-11-08-18].
> [2016-11-08 21:45:43,421] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,483] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,811] INFO Session establishment complete on server 
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
> 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
> [2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> log4j:ERROR Failed to rename [/state-change.log] to 
> [/state-change.log.2016-11-08-18].
> [2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 
> with addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
> (kafka.utils.ZkUtils)
> [2016-11-08 21:45:44,686] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
> [2016-11-08 21:45:45,061] INFO New leader is 0 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-11-08 21:45:47,325] ERROR Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint' 

[jira] [Commented] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-08 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15649179#comment-15649179
 ] 

huxi commented on KAFKA-4391:
-

It might because of the fact that network and disk will be deactivated during 
the pc hibernating by default. Could you have a check?

> On Windows, Kafka server stops with uncaught exception after coming back from 
> sleep
> ---
>
> Key: KAFKA-4391
> URL: https://issues.apache.org/jira/browse/KAFKA-4391
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
> Environment: Windows 10, jdk1.8.0_111
>Reporter: Yiquan Zhou
>
> Steps to reproduce:
> 1. start the zookeeper
> $ bin\windows\zookeeper-server-start.bat config/zookeeper.properties
> 2. start the Kafka server with the default properties
> $ bin\windows\kafka-server-start.bat config/server.properties
> 3. put Windows into sleep mode for 1-2 hours
> 4. activate Windows again, an exception occurs in Kafka server console and 
> the server is stopped:
> {code:title=kafka console log}
> [2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
> server in 10081379ms for sessionid 0x1584514da47, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,029] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,044] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1584514da47 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-11-08 21:45:43,236] INFO Initiating client connection, 
> connectString=localhost:2181 sessionTimeout=6000 
> watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
> [2016-11-08 21:45:43,280] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> log4j:ERROR Failed to rename [/controller.log] to 
> [/controller.log.2016-11-08-18].
> [2016-11-08 21:45:43,421] INFO Opening socket connection to server 
> 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL 
> (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,483] INFO Socket connection established to 
> 127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,811] INFO Session establishment complete on server 
> 127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
> 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
> [2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> log4j:ERROR Failed to rename [/state-change.log] to 
> [/state-change.log.2016-11-08-18].
> [2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 
> with addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
> (kafka.utils.ZkUtils)
> [2016-11-08 21:45:44,686] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck)
> [2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
> [2016-11-08 21:45:45,061] INFO New leader is 0 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-11-08 21:45:47,325] ERROR Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)
> java.io.IOException: File rename from 
> D:\tmp\kafka-logs\recovery-point-offset-checkpoint.tmp to 
> D:\tmp\kafka-logs\recovery-point-offset-checkpoint failed.
> at 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
The benefit of having "high throughput, low latency" as a design goal
is that when you are trying to choose between two designs (int key vs
string key), we make sure we trade-off performance to get a benefit
somewhere else.
I know it is obvious but when you said: "The overall amount of time
spent parsing a set of header key, value pairs probably does not
matter unless you are getting close to 1M messages per consumer.  In
which case probably don't use headers."
Well, my first reaction is that this is exactly what we are trying to
do here - give headers to consumers that do close to 1M events per sec
(Which is what Magnus is writing).

As an aside, The overhead isn't just parsing, it is the message size
itself (i.e. I'll be fine with fixed-size 32-bit strings...)

Pretty much any feature we add, we are trying to make sure it either
doesn't impact performance or that it gives enough benefits in return.
See the very long discussions on timestamps, consumer coordination,
SSL, etc.

And that's where the issue is: I (and few more of us) don't see a
significant benefit for strings vs ints. I do agree there is a benefit
in namespacing keys (which I think it what you meant by
"confluent.profile_id"), but this can still be done with just ints.

On Tue, Nov 8, 2016 at 3:12 PM, Sean McCauliff
 wrote:
> On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira  wrote:
>
>> Since Kafka specifically targets high-throughput, low-latency
>> use-cases, I don't think we should trade them off that easily.
>>
>
> I find these kind of design goals not to be really helpful unless it's
> quantified in someway.  Because it's always possible to argue against
> something as either being not performant or just an implementation detail.
>
> This is a single threaded benchmarks so all the measurements are per
> thread.
>
> For 1M messages/s/thread  if header keys are int and you had even a single
> header key, value pair then it's still about 2^-2 microseconds which means
> you only have another 0.75 microseconds to do everything else you want to
> do with a message (1M messages/s means 1 micro second per message).  With
> string header keys there is still 0.5 micro seconds to process a message.
>
>
>
> I love strings as much as the next guy (we had them in Flume), but I
>> was convinced by Magnus/Michael/Radai that strings don't actually have
>> strong benefits as opposed to ints (you'll need a string registry
>> anyway - otherwise, how will you know what does the "profile_id"
>> header refers to?) and I want to keep closer to our original design
>> goals for Kafka.
>>
>
> "confluent.profile_id"
>
>
>>
>> If someone likes strings in the headers and doesn't do millions of
>> messages a sec, they probably have lots of other systems they can use
>> instead.
>>
>
> None of them will scale like Kafka.  Horizontal scaling is still good.
>
>
>>
>>
>> On Tue, Nov 8, 2016 at 1:22 PM, Sean McCauliff
>>  wrote:
>> > +1 for String keys.
>> >
>> > I've been doing some bechmarking and it seems like the speedup for using
>> > integer keys is about 2-5 depending on the length of the strings and what
>> > collections are being used.  The overall amount of time spent parsing a
>> set
>> > of header key, value pairs probably does not matter unless you are
>> getting
>> > close to 1M messages per consumer.  In which case probably don't use
>> > headers.  There is also the option to use very short strings; some that
>> are
>> > even shorter than integers.
>> >
>> > Partitioning the string key space will be easier than partitioning an
>> > integer key space. We won't need a global registry.  Kafka internally can
>> > reserve some prefix like "_" as its namespace.  Everyone else can use
>> their
>> > company or project name as namespace prefix and life should be good.
>> >
>> > Here's the link to some of the benchmarking info:
>> > https://docs.google.com/document/d/1tfT-6SZdnKOLyWGDH82kS30PnUkmgb7nPL
>> dw6p65pAI/edit?usp=sharing
>> >
>> >
>> >
>> > --
>> > Sean McCauliff
>> > Staff Software Engineer
>> > Kafka
>> >
>> > smccaul...@linkedin.com
>> > linkedin.com/in/sean-mccauliff-b563192
>> >
>> > On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
>> > wrote:
>> >
>> >> +1 on this slimmer version of our proposal
>> >>
>> >> I def think the Id space we can reduce from the proposed int32(4bytes)
>> >> down to int16(2bytes) it saves on space and as headers we wouldn't
>> expect
>> >> the number of headers being used concurrently being that high.
>> >>
>> >> I would wonder if we should make the value byte array length still int32
>> >> though as This is the standard Max array length in Java saying that it
>> is a
>> >> header and I guess limiting the size is sensible and would work for all
>> the
>> >> use cases we have in mind so happy with limiting this.
>> >>
>> >> Do people generally concur on Magnus's slimmer version? Anyone see any
>> >> issues if we moved from 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Sean McCauliff
On Tue, Nov 8, 2016 at 2:15 PM, Gwen Shapira  wrote:

> Since Kafka specifically targets high-throughput, low-latency
> use-cases, I don't think we should trade them off that easily.
>

I find these kind of design goals not to be really helpful unless it's
quantified in someway.  Because it's always possible to argue against
something as either being not performant or just an implementation detail.

This is a single threaded benchmarks so all the measurements are per
thread.

For 1M messages/s/thread  if header keys are int and you had even a single
header key, value pair then it's still about 2^-2 microseconds which means
you only have another 0.75 microseconds to do everything else you want to
do with a message (1M messages/s means 1 micro second per message).  With
string header keys there is still 0.5 micro seconds to process a message.



I love strings as much as the next guy (we had them in Flume), but I
> was convinced by Magnus/Michael/Radai that strings don't actually have
> strong benefits as opposed to ints (you'll need a string registry
> anyway - otherwise, how will you know what does the "profile_id"
> header refers to?) and I want to keep closer to our original design
> goals for Kafka.
>

"confluent.profile_id"


>
> If someone likes strings in the headers and doesn't do millions of
> messages a sec, they probably have lots of other systems they can use
> instead.
>

None of them will scale like Kafka.  Horizontal scaling is still good.


>
>
> On Tue, Nov 8, 2016 at 1:22 PM, Sean McCauliff
>  wrote:
> > +1 for String keys.
> >
> > I've been doing some bechmarking and it seems like the speedup for using
> > integer keys is about 2-5 depending on the length of the strings and what
> > collections are being used.  The overall amount of time spent parsing a
> set
> > of header key, value pairs probably does not matter unless you are
> getting
> > close to 1M messages per consumer.  In which case probably don't use
> > headers.  There is also the option to use very short strings; some that
> are
> > even shorter than integers.
> >
> > Partitioning the string key space will be easier than partitioning an
> > integer key space. We won't need a global registry.  Kafka internally can
> > reserve some prefix like "_" as its namespace.  Everyone else can use
> their
> > company or project name as namespace prefix and life should be good.
> >
> > Here's the link to some of the benchmarking info:
> > https://docs.google.com/document/d/1tfT-6SZdnKOLyWGDH82kS30PnUkmgb7nPL
> dw6p65pAI/edit?usp=sharing
> >
> >
> >
> > --
> > Sean McCauliff
> > Staff Software Engineer
> > Kafka
> >
> > smccaul...@linkedin.com
> > linkedin.com/in/sean-mccauliff-b563192
> >
> > On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
> > wrote:
> >
> >> +1 on this slimmer version of our proposal
> >>
> >> I def think the Id space we can reduce from the proposed int32(4bytes)
> >> down to int16(2bytes) it saves on space and as headers we wouldn't
> expect
> >> the number of headers being used concurrently being that high.
> >>
> >> I would wonder if we should make the value byte array length still int32
> >> though as This is the standard Max array length in Java saying that it
> is a
> >> header and I guess limiting the size is sensible and would work for all
> the
> >> use cases we have in mind so happy with limiting this.
> >>
> >> Do people generally concur on Magnus's slimmer version? Anyone see any
> >> issues if we moved from int32 to int16?
> >>
> >> Re configurable ids per plugin over a global registry also would work
> for
> >> us.  As such if this has better concensus over the proposed global
> registry
> >> I'd be happy to change that.
> >>
> >> I was already sold on ints over strings for keys ;)
> >>
> >> Cheers
> >> Mike
> >>
> >> 
> >> From: Magnus Edenhill 
> >> Sent: Monday, November 7, 2016 10:10:21 PM
> >> To: dev@kafka.apache.org
> >> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> >>
> >> Hi,
> >>
> >> I'm +1 for adding generic message headers, but I do share the concerns
> >> previously aired on this thread and during the KIP meeting.
> >>
> >> So let me propose a slimmer alternative that does not require any sort
> of
> >> global header registry, does not affect broker performance or
> operations,
> >> and adds as little overhead as possible.
> >>
> >>
> >> Message
> >> 
> >> The protocol Message type is extended with a Headers array consting of
> >> Tags, where a Tag is defined as:
> >>int16 Id
> >>int16 Len  // binary_data length
> >>binary_data[Len]  // opaque binary data
> >>
> >>
> >> Ids
> >> ---
> >> The Id space is not centrally managed, so whenever an application needs
> to
> >> add headers, or use an eco-system plugin that does, its Id allocation
> will
> >> need to be manually configured.
> >> This moves the allocation concern 

[jira] [Updated] (KAFKA-4364) Sink tasks expose secrets in DEBUG logging

2016-11-08 Thread Ryan P (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan P updated KAFKA-4364:
--
Assignee: Ryan P  (was: Ewen Cheslack-Postava)

> Sink tasks expose secrets in DEBUG logging
> --
>
> Key: KAFKA-4364
> URL: https://issues.apache.org/jira/browse/KAFKA-4364
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ryan P
>Assignee: Ryan P
>
> As it stands today worker tasks print secrets such as Key/Trust store 
> passwords to their respective logs. 
> https://github.com/confluentinc/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L213-L214
> i.e.
> [2016-11-01 12:50:59,254] DEBUG Initializing connector test-sink with config 
> {consumer.ssl.truststore.password=password, 
> connector.class=io.confluent.connect.jdbc.JdbcSinkConnector, 
> connection.password=password, producer.security.protocol=SSL, 
> producer.ssl.truststore.password=password, topics=orders, tasks.max=1, 
> consumer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> producer.ssl.truststore.location=/tmp/truststore/kafka.trustore.jks, 
> connection.user=connect, name=test-sink, auto.create=true, 
> consumer.security.protocol=SSL, 
> connection.url=jdbc:postgresql://localhost/test} 
> (org.apache.kafka.connect.runtime.WorkerConnector:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1882: MINOR: some trace logging for streams debugging

2016-11-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1882


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2114: MINOR: add upgrade guide for Kafka Streams API

2016-11-08 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/2114

MINOR: add upgrade guide for Kafka Streams API



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka updateDocUpgradeSection

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2114.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2114


commit efd3e2aa36072ffd4f659d3bf3f8c22a02e74398
Author: Matthias J. Sax 
Date:   2016-11-08T22:44:54Z

MINOR: add upgrade guide for Kafka Streams API




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2016-11-08 Thread Ara Ebrahimi (JIRA)
Ara Ebrahimi created KAFKA-4392:
---

 Summary: Failed to lock the state directory due to an unexpected 
exception
 Key: KAFKA-4392
 URL: https://issues.apache.org/jira/browse/KAFKA-4392
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Ara Ebrahimi
Assignee: Guozhang Wang


This happened on streaming startup, on a clean installation, no existing 
folder. Here I was starting 4 instances of our streaming app on 4 machines and 
one threw this exception. Seems to me there’s a race condition somewhere when 
instances discover others, or something like that.

2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
directory due to an unexpected exception
java.nio.file.NoSuchFileException: 
/data/1/kafka-streams/myapp-streams/7_21/.lock
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
at java.nio.channels.FileChannel.open(FileChannel.java:287)
at java.nio.channels.FileChannel.open(FileChannel.java:335)
at 
org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
at 
org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
at 
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
^C
[arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
directory
[arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
total 4
drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_13
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_14
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_16
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_2
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_22
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_28
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_3
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_31
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_5
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_7
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_8
drwxr-xr-x  3 root root   32 Nov  2 15:43 0_9
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_1
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_10
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_14
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_15
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_16
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_17
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_18
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_3
drwxr-xr-x  3 root root   32 Nov  2 15:43 1_5
drwxr-xr-x  3 root root   60 Nov  2 15:43 2_1
drwxr-xr-x  3 root root   60 Nov  2 15:43 2_10
drwxr-xr-x  3 root root   60 Nov  2 15:43 2_12
drwxr-xr-x  3 root root   60 Nov  2 15:43 2_20
drwxr-xr-x  3 root root   60 Nov  2 15:43 2_24
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_10
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_11
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_19
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_20
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_25
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_26
drwxr-xr-x  3 root root   61 Nov  2 15:43 3_3
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_11
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_12
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_18
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_19
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_24
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_25
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_26
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_4
drwxr-xr-x  3 root root   64 Nov  2 15:43 4_9
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_1
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_10
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_11
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_13
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_15
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_17
drwxr-xr-x  3 root root   58 Nov  2 15:43 5_18
drwxr-xr-x  2 root root   18 Nov  2 15:43 6_13
drwxr-xr-x  2 root root   18 Nov  2 15:43 6_15
drwxr-xr-x  2 root root   18 Nov  2 15:43 6_18
drwxr-xr-x  2 root root   18 Nov  2 15:43 6_19
drwxr-xr-x  2 root root   18 Nov  2 15:43 

[GitHub] kafka pull request #2113: KAFKA-4376: Cross compile to Scala 2.12.0

2016-11-08 Thread leachbj
GitHub user leachbj opened a pull request:

https://github.com/apache/kafka/pull/2113

KAFKA-4376: Cross compile to Scala 2.12.0



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/leachbj/kafka 2.12.0-trunk-build

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2113


commit fec890b02023a0ee760343773cf305e10fb9
Author: Bernard Leach 
Date:   2016-11-01T22:01:34Z

KAFKA-4376: Cross compile to Scala 2.12.0




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Gwen Shapira
Since Kafka specifically targets high-throughput, low-latency
use-cases, I don't think we should trade them off that easily.

I love strings as much as the next guy (we had them in Flume), but I
was convinced by Magnus/Michael/Radai that strings don't actually have
strong benefits as opposed to ints (you'll need a string registry
anyway - otherwise, how will you know what does the "profile_id"
header refers to?) and I want to keep closer to our original design
goals for Kafka.

If someone likes strings in the headers and doesn't do millions of
messages a sec, they probably have lots of other systems they can use
instead.


On Tue, Nov 8, 2016 at 1:22 PM, Sean McCauliff
 wrote:
> +1 for String keys.
>
> I've been doing some bechmarking and it seems like the speedup for using
> integer keys is about 2-5 depending on the length of the strings and what
> collections are being used.  The overall amount of time spent parsing a set
> of header key, value pairs probably does not matter unless you are getting
> close to 1M messages per consumer.  In which case probably don't use
> headers.  There is also the option to use very short strings; some that are
> even shorter than integers.
>
> Partitioning the string key space will be easier than partitioning an
> integer key space. We won't need a global registry.  Kafka internally can
> reserve some prefix like "_" as its namespace.  Everyone else can use their
> company or project name as namespace prefix and life should be good.
>
> Here's the link to some of the benchmarking info:
> https://docs.google.com/document/d/1tfT-6SZdnKOLyWGDH82kS30PnUkmgb7nPLdw6p65pAI/edit?usp=sharing
>
>
>
> --
> Sean McCauliff
> Staff Software Engineer
> Kafka
>
> smccaul...@linkedin.com
> linkedin.com/in/sean-mccauliff-b563192
>
> On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
> wrote:
>
>> +1 on this slimmer version of our proposal
>>
>> I def think the Id space we can reduce from the proposed int32(4bytes)
>> down to int16(2bytes) it saves on space and as headers we wouldn't expect
>> the number of headers being used concurrently being that high.
>>
>> I would wonder if we should make the value byte array length still int32
>> though as This is the standard Max array length in Java saying that it is a
>> header and I guess limiting the size is sensible and would work for all the
>> use cases we have in mind so happy with limiting this.
>>
>> Do people generally concur on Magnus's slimmer version? Anyone see any
>> issues if we moved from int32 to int16?
>>
>> Re configurable ids per plugin over a global registry also would work for
>> us.  As such if this has better concensus over the proposed global registry
>> I'd be happy to change that.
>>
>> I was already sold on ints over strings for keys ;)
>>
>> Cheers
>> Mike
>>
>> 
>> From: Magnus Edenhill 
>> Sent: Monday, November 7, 2016 10:10:21 PM
>> To: dev@kafka.apache.org
>> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>>
>> Hi,
>>
>> I'm +1 for adding generic message headers, but I do share the concerns
>> previously aired on this thread and during the KIP meeting.
>>
>> So let me propose a slimmer alternative that does not require any sort of
>> global header registry, does not affect broker performance or operations,
>> and adds as little overhead as possible.
>>
>>
>> Message
>> 
>> The protocol Message type is extended with a Headers array consting of
>> Tags, where a Tag is defined as:
>>int16 Id
>>int16 Len  // binary_data length
>>binary_data[Len]  // opaque binary data
>>
>>
>> Ids
>> ---
>> The Id space is not centrally managed, so whenever an application needs to
>> add headers, or use an eco-system plugin that does, its Id allocation will
>> need to be manually configured.
>> This moves the allocation concern from the global space down to
>> organization level and avoids the risk for id conflicts.
>> Example pseudo-config for some app:
>> sometrackerplugin.tag.sourcev3.id=1000
>> dbthing.tag.tablename.id=1001
>> myschemareg.tag.schemaname.id=1002
>> myschemareg.tag.schemaversion.id=1003
>>
>>
>> Each header-writing or header-reading plugin must provide means (typically
>> through configuration) to specify the tag for each header it uses. Defaults
>> should be avoided.
>> A consumer silently ignores tags it does not have a mapping for (since the
>> binary_data can't be parsed without knowing what it is).
>>
>> Id range 0..999 is reserved for future use by the broker and must not be
>> used by plugins.
>>
>>
>>
>> Broker
>> -
>> The broker does not process the tags (other than the standard protocol
>> syntax verification), it simply stores and forwards them as opaque data.
>>
>> Standard message translation (removal of Headers) kicks in for older
>> clients.
>>
>>
>> Why not string ids?
>> -
>> String ids might seem 

Build failed in Jenkins: kafka-trunk-jdk7 #1677

2016-11-08 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: remove commented out code and System.out.println

--
[...truncated 3888 lines...]

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout STARTED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout FAILED
java.lang.AssertionError: Message set should have 1 message
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.producer.ProducerTest.testAsyncSendCanCorrectlyFailWithTimeout(ProducerTest.scala:312)

kafka.producer.ProducerTest > testSendNullMessage STARTED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer STARTED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage STARTED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.common.ConfigTest > testInvalidGroupIds STARTED

kafka.common.ConfigTest > testInvalidGroupIds PASSED

kafka.common.ConfigTest > testInvalidClientIds STARTED

kafka.common.ConfigTest > testInvalidClientIds PASSED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
STARTED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
PASSED

kafka.common.TopicTest > testInvalidTopicNames STARTED


Re: [DISCUSS] KAFKA-4345: Run decktape test for each pull request

2016-11-08 Thread Raghav Kumar Gautam
On Mon, Nov 7, 2016 at 3:20 PM, Ewen Cheslack-Postava 
wrote:

> On Mon, Nov 7, 2016 at 10:30 AM Raghav Kumar Gautam 
> wrote:
>
> > Hi Ewen,
> >
> > Thanks for the feedback. Answers are inlined.
> >
> > On Sun, Nov 6, 2016 at 8:46 PM, Ewen Cheslack-Postava  >
> > wrote:
> >
> > > Yeah, I'm all for getting these to run more frequently and on lighter
> > > weight infrastructure. (By the way, I also saw the use of docker; I'd
> > > really like to get a "native" docker cluster type into ducktape at some
> > > point so all you have to do is bake the image and then spawn containers
> > on
> > > demand.)
> > >
> > I completely agree, supporting docker integration in ducktape would be
> the
> > ideal solution of the problem.
> >
> >
> > >
> > > A few things. First, it'd be nice to know if we can chain these with
> > normal
> > > PR builds or something like that. Even starting the system tests when
> we
> > > don't know the unit tests will pass seems like it'd be wasteful.
> > >
> > If we do chaining one problem that it will bring is that the turn around
> > time will suffer. It would take 1.5 hrs to run unit tests then another
> 1.5
> > hrs to run decktape tests. Also, don't dev run relevant unit tests before
> > they submit a patch ?
> >
>
> Yeah, I get that. Turnaround time will obviously suffer from serializing
> anything. Here the biggest problem today is that Jenkins builds are not as
> highly parallelized as most users run the tests locally, and the large
> number of integration tests that are baked into the unit tests mean they
> take quite a long time. While running the tests locally has been creeping
> up quite a bit recently, it's still at least < 15min on a relatively recent
> MBP. Ideally we could just get the Jenkins builds to finish faster...
>

I investigated a little bit and it seems that unit tests are not entirely
stable. So, it does not make sense to run them serially as of now.

For e.g.:
https://github.com/apache/kafka/pull/2107 (The unit tests were passing
after 1st commit and failing after second commit. The second commit only
had comment changes.)
https://github.com/apache/kafka/pull/2108
https://github.com/apache/kafka/pull/2099
https://github.com/apache/kafka/pull/2093


> > >
> > > Second, agreed on getting things stable before turning this on across
> the
> > > board.
> >
> > I have done some work for stabilizing the tests. But I need help from
> kafka
> > community to take this further. It will be great if someone can guide me
> on
> > how to do this ? Should we start with a subset of tests that are stable
> and
> > enable others as we make progress ? Who are the people that can I work
> with
> > on this problem ?
> >
>
> It'll probably be a variety of people because it depends on the components
> that are unstable. For example, just among committers, different folks know
> different areas of the code (and especially system tests) to different
> degrees. I can probably help across the board in terms of ducktape/system
> test stuff, but for any individual test you'll probably just want to git
> blame to figure out who might be best to ask for help.
>
> I can take a pass at this patch and see how much makes sense to commit
> immediately. If we don't immediately start getting feedback on failing
> tests and can instead make progress by requesting them manually on only
> some PRs or something like that, then that seems like it could be
> reasonable.
>
> My biggest concern, just taking a quick pass at the changes, is that we're
> doing a lot of renaming of tests just to split them up rather than by
> logical grouping. If we need to do this, it seems much better to add a
> small amount of tooling to ducktape to execute subsets of tests (e.g. split
> across N subsets of the tests). It requires more coordination between
> ducktape and getting this landed, but feels like a much cleaner solution,
> and one that could eventually take advantage of additional information
> (e.g. if it knows avg runtime from previous runs, then it can divide them
> based on that instead of only considering the # of tests).


I agree that the ideal solution would be to add support for this in
ducktape. But since this is going to be a big change, can we do this in a
separate jira ?


> > > Confluent runs these tests nightly on full VMs in AWS and
> > > historically, besides buggy logic in tests, underprovisioned resources
> > tend
> > > to be the biggest source of flakiness in tests.
> > >
> >  Good to know that I am not the only one worrying about this problem :-)
> >
> > Finally, should we be checking w/ infra and/or Travis folks before
> enabling
> > > something this expensive? Are the Storm integration tests of comparable
> > > cost? There are some in-flight patches for parallelizing test runs of
> > > ducktape tests (which also results in better utilization). But even
> with
> > > those changes, the full test run is still quite a few VM-hours per PR
> 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Sean McCauliff
+1 for String keys.

I've been doing some bechmarking and it seems like the speedup for using
integer keys is about 2-5 depending on the length of the strings and what
collections are being used.  The overall amount of time spent parsing a set
of header key, value pairs probably does not matter unless you are getting
close to 1M messages per consumer.  In which case probably don't use
headers.  There is also the option to use very short strings; some that are
even shorter than integers.

Partitioning the string key space will be easier than partitioning an
integer key space. We won't need a global registry.  Kafka internally can
reserve some prefix like "_" as its namespace.  Everyone else can use their
company or project name as namespace prefix and life should be good.

Here's the link to some of the benchmarking info:
https://docs.google.com/document/d/1tfT-6SZdnKOLyWGDH82kS30PnUkmgb7nPLdw6p65pAI/edit?usp=sharing



--
Sean McCauliff
Staff Software Engineer
Kafka

smccaul...@linkedin.com
linkedin.com/in/sean-mccauliff-b563192

On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
wrote:

> +1 on this slimmer version of our proposal
>
> I def think the Id space we can reduce from the proposed int32(4bytes)
> down to int16(2bytes) it saves on space and as headers we wouldn't expect
> the number of headers being used concurrently being that high.
>
> I would wonder if we should make the value byte array length still int32
> though as This is the standard Max array length in Java saying that it is a
> header and I guess limiting the size is sensible and would work for all the
> use cases we have in mind so happy with limiting this.
>
> Do people generally concur on Magnus's slimmer version? Anyone see any
> issues if we moved from int32 to int16?
>
> Re configurable ids per plugin over a global registry also would work for
> us.  As such if this has better concensus over the proposed global registry
> I'd be happy to change that.
>
> I was already sold on ints over strings for keys ;)
>
> Cheers
> Mike
>
> 
> From: Magnus Edenhill 
> Sent: Monday, November 7, 2016 10:10:21 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>
> Hi,
>
> I'm +1 for adding generic message headers, but I do share the concerns
> previously aired on this thread and during the KIP meeting.
>
> So let me propose a slimmer alternative that does not require any sort of
> global header registry, does not affect broker performance or operations,
> and adds as little overhead as possible.
>
>
> Message
> 
> The protocol Message type is extended with a Headers array consting of
> Tags, where a Tag is defined as:
>int16 Id
>int16 Len  // binary_data length
>binary_data[Len]  // opaque binary data
>
>
> Ids
> ---
> The Id space is not centrally managed, so whenever an application needs to
> add headers, or use an eco-system plugin that does, its Id allocation will
> need to be manually configured.
> This moves the allocation concern from the global space down to
> organization level and avoids the risk for id conflicts.
> Example pseudo-config for some app:
> sometrackerplugin.tag.sourcev3.id=1000
> dbthing.tag.tablename.id=1001
> myschemareg.tag.schemaname.id=1002
> myschemareg.tag.schemaversion.id=1003
>
>
> Each header-writing or header-reading plugin must provide means (typically
> through configuration) to specify the tag for each header it uses. Defaults
> should be avoided.
> A consumer silently ignores tags it does not have a mapping for (since the
> binary_data can't be parsed without knowing what it is).
>
> Id range 0..999 is reserved for future use by the broker and must not be
> used by plugins.
>
>
>
> Broker
> -
> The broker does not process the tags (other than the standard protocol
> syntax verification), it simply stores and forwards them as opaque data.
>
> Standard message translation (removal of Headers) kicks in for older
> clients.
>
>
> Why not string ids?
> -
> String ids might seem like a good idea, but:
>  * does not really solve uniqueness
>  * consumes a lot of space (2 byte string length + string, per header) to
> be meaningful
>  * doesn't really say anything how to parse the tag's data, so it is in
> effect useless on its own.
>
>
> Regards,
> Magnus
>
>
>
>
> 2016-11-07 18:32 GMT+01:00 Michael Pearce :
>
> > Hi Roger,
> >
> > Thanks for the support.
> >
> > I think the key thing is to have a common key space to make an ecosystem,
> > there does have to be some level of contract for people to play nicely.
> >
> > Having map or as per current proposed in kip of having a
> > numerical key space of  map is a level of the contract that
> > most people would expect.
> >
> > I think the example in a previous comment someone else made linking to
> AWS
> > blog and also 

Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Rajini Sivaram
Jun,

Have added a sub-section on delegation token support to the KIP.

Thank you,

Rajini

On Tue, Nov 8, 2016 at 8:07 PM, Jun Rao  wrote:

> Hi, Rajini,
>
> That makes sense. Could you document this potential future extension in the
> KIP?
>
> Jun
>
> On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Jun,
> >
> > 11. SCRAM messages have an optional extensions field which is a list of
> > key=value pairs. We can add an extension key to the first client message
> to
> > indicate delegation token. Broker can then obtain credentials and
> principal
> > using a different code path for delegation tokens.
> >
> > On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:
> >
> > > Magnus,
> > >
> > > Thanks for the input. If you don't feel strongly the need to bump up
> the
> > > version of SaslHandshake, we can leave the version unchanged.
> > >
> > > Rajini,
> > >
> > > 11. Yes, we could send the HMAC as the SCRAM password for the
> delegation
> > > token. Do we need something to indicate that this SCRAM token is
> special
> > > (i.e., delegation token) so that we can generate the correct
> > > KafkaPrincipal? The delegation token logic can be added later. I am
> > asking
> > > just so that we have enough in the design of SCRAM to add the
> delegation
> > > token logic later.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com
> > > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 10. *s=* and *i=* come from the SCRAM standard
> (they
> > > are
> > > > transferred during SCRAM auth). Scram messages look like (for
> example)
> > > > *r=,s=,i=*. StoredKey and ServerKey and not
> > > > transferred in SCRAM messages, so I picked two keys that are unused
> in
> > > > SCRAM.
> > > >
> > > > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password
> for
> > > > authentication along with a username and an optional
> authorization-id.
> > > > Kafka uses the username as the identity (Kafka principal) for
> > > > authentication and authorization. KIP-48 doesn't mention
> KafkaPrincipal
> > > in
> > > > the section "Authentication using Token", but a delegation token is
> > > > associated with a Kafka principal. Since delegation tokens are
> acquired
> > > on
> > > > behalf of a KafkaPrincipal and the principal is included in the token
> > as
> > > > the token owner,  clients authenticating with delegation tokens could
> > use
> > > > the token owner as username and the token HMAC as shared
> > secret/password.
> > > >
> > > > If necessary, any other form of token identifier may be used as
> > username
> > > as
> > > > well as long as it contains sufficient information for the broker to
> > > > retrieve/compute the principal and HMAC for authentication. The
> server
> > > > callback handler can be updated when delegation tokens are
> implemented
> > to
> > > > generate Kafka principal accordingly.
> > > >
> > > >
> > > > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
> > > >
> > > > > Hi, Rajini,
> > > > >
> > > > > A couple of other questions on the KIP.
> > > > >
> > > > > 10. For the config values stored in ZK, are those keys (s, t, k, i,
> > > etc)
> > > > > stored under scram-sha-256 standard?
> > > > >
> > > > > 11. Could KIP-48 (delegation token) use this KIP to send delegation
> > > > tokens?
> > > > > In KIP-48, the client sends a HMAC as the delegation token to the
> > > server.
> > > > > Not sure how this gets mapped to the username/password in this KIP.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> > > > > rajinisiva...@googlemail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256
> > SASL
> > > > > > mechanisms to Kafka:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 84%3A+Support+SASL+SCRAM+mechanisms
> > > > > >
> > > > > >
> > > > > > Comments and suggestions are welcome.
> > > > > >
> > > > > > Thank you...
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Rajini
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > >
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini


[jira] [Updated] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-08 Thread Yiquan Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yiquan Zhou updated KAFKA-4391:
---
Description: 
Steps to reproduce:
1. start the zookeeper
$ bin\windows\zookeeper-server-start.bat config/zookeeper.properties

2. start the Kafka server with the default properties
$ bin\windows\kafka-server-start.bat config/server.properties

3. put Windows into sleep mode for 1-2 hours

4. activate Windows again, an exception occurs in Kafka server console and the 
server is stopped:
{code:title=kafka console log}
[2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
server in 10081379ms for sessionid 0x1584514da47, closing socket connection 
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-11-08 21:45:43,029] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,044] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
session 0x1584514da47 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2016-11-08 21:45:43,236] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
[2016-11-08 21:45:43,280] INFO EventThread shut down 
(org.apache.zookeeper.ClientCnxn)
log4j:ERROR Failed to rename [/controller.log] to 
[/controller.log.2016-11-08-18].
[2016-11-08 21:45:43,421] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,483] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,811] INFO Session establishment complete on server 
127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
[2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
log4j:ERROR Failed to rename [/state-change.log] to 
[/state-change.log.2016-11-08-18].
[2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
(kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 with 
addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
(kafka.utils.ZkUtils)
[2016-11-08 21:45:44,686] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2016-11-08 21:45:45,061] INFO New leader is 0 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-11-08 21:45:47,325] ERROR Uncaught exception in scheduled task 
'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)
java.io.IOException: File rename from 
D:\tmp\kafka-logs\recovery-point-offset-checkpoint.tmp to 
D:\tmp\kafka-logs\recovery-point-offset-checkpoint failed.
at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:66)
at 
kafka.log.LogManager.kafka$log$LogManager$$checkpointLogsInDir(LogManager.scala:326)
at 
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:317)
at 
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:317)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:317)
at 
kafka.log.LogManager$$anonfun$startup$3.apply$mcV$sp(LogManager.scala:201)
at 

[jira] [Created] (KAFKA-4391) On Windows, Kafka server stops with uncaught exception after coming back from sleep

2016-11-08 Thread Yiquan Zhou (JIRA)
Yiquan Zhou created KAFKA-4391:
--

 Summary: On Windows, Kafka server stops with uncaught exception 
after coming back from sleep
 Key: KAFKA-4391
 URL: https://issues.apache.org/jira/browse/KAFKA-4391
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.1
 Environment: Windows 10, jdk1.8.0_111
Reporter: Yiquan Zhou


Steps to reproduce:
1. start the zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

2. start the Kafka server with the default properties
$ bin/kafka-server-start.sh config/server.properties

3. put Windows into sleep mode for 1-2 hours

4. activate Windows again, an exception occurs in Kafka server console and the 
server is stopped:
{code:title=kafka console log}
[2016-11-08 21:45:35,185] INFO Client session timed out, have not heard from 
server in 10081379ms for sessionid 0x1584514da47, closing socket connection 
and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:40,698] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2016-11-08 21:45:43,029] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,044] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,158] INFO Unable to reconnect to ZooKeeper service, 
session 0x1584514da47 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,158] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2016-11-08 21:45:43,236] INFO Initiating client connection, 
connectString=localhost:2181 sessionTimeout=6000 
watcher=org.I0Itec.zkclient.ZkClient@11ca437b (org.apache.zookeeper.ZooKeeper)
[2016-11-08 21:45:43,280] INFO EventThread shut down 
(org.apache.zookeeper.ClientCnxn)
log4j:ERROR Failed to rename [/controller.log] to 
[/controller.log.2016-11-08-18].
[2016-11-08 21:45:43,421] INFO Opening socket connection to server 
127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,483] INFO Socket connection established to 
127.0.0.1/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,811] INFO Session establishment complete on server 
127.0.0.1/127.0.0.1:2181, sessionid = 0x1584514da470001, negotiated timeout = 
6000 (org.apache.zookeeper.ClientCnxn)
[2016-11-08 21:45:43,827] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
log4j:ERROR Failed to rename [/server.log] to [/server.log.2016-11-08-18].
[2016-11-08 21:45:43,827] INFO Creating /controller (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,014] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,014] INFO 0 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
log4j:ERROR Failed to rename [/state-change.log] to 
[/state-change.log.2016-11-08-18].
[2016-11-08 21:45:44,421] INFO re-registering broker info in ZK for broker 0 
(kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:44,436] INFO Creating /brokers/ids/0 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,686] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2016-11-08 21:45:44,686] INFO Registered broker 0 at path /brokers/ids/0 with 
addresses: PLAINTEXT -> EndPoint(192.168.0.15,9092,PLAINTEXT) 
(kafka.utils.ZkUtils)
[2016-11-08 21:45:44,686] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:44,686] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck)
[2016-11-08 21:45:45,046] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions [test,0] (kafka.server.ReplicaFetcherManager)
[2016-11-08 21:45:45,061] INFO New leader is 0 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-11-08 21:45:47,325] ERROR Uncaught exception in scheduled task 
'kafka-recovery-point-checkpoint' (kafka.utils.KafkaScheduler)
java.io.IOException: File rename from 
D:\tmp\kafka-logs\recovery-point-offset-checkpoint.tmp to 
D:\tmp\kafka-logs\recovery-point-offset-checkpoint failed.
at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:66)
at 
kafka.log.LogManager.kafka$log$LogManager$$checkpointLogsInDir(LogManager.scala:326)
at 
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:317)
at 
kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:317)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 

Build failed in Jenkins: kafka-trunk-jdk7 #1676

2016-11-08 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-4284: Make Partitioner a Closeable and close it when closing the

--
[...truncated 14180 lines...]

org.apache.kafka.streams.kstream.KStreamBuilderTest > testMerge PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testFrom STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testFrom PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testNewName STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testNewName PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
shouldHaveSaneEqualsAndHashCode STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
shouldHaveSaneEqualsAndHashCode PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeZero 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeZero 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > advanceIntervalMustNotBeZero 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > advanceIntervalMustNotBeZero 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeNegative 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeNegative 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeNegative STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeNegative PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeLargerThanWindowSize STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeLargerThanWindowSize PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForTumblingWindows 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForTumblingWindows 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForHoppingWindows 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForHoppingWindows 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
windowsForBarelyOverlappingHoppingWindows STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
windowsForBarelyOverlappingHoppingWindows PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
STARTED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldBeSupportNonPrefixedConsumerConfigs 

Build failed in Jenkins: kafka-trunk-jdk8 #1027

2016-11-08 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: remove commented out code and System.out.println

--
[...truncated 3875 lines...]

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr STARTED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing STARTED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue STARTED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.producer.AsyncProducerTest > 

[GitHub] kafka pull request #2092: MINOR: remove commented out code and System.out.pr...

2016-11-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2092


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Jun Rao
Hi, Rajini,

That makes sense. Could you document this potential future extension in the
KIP?

Jun

On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Jun,
>
> 11. SCRAM messages have an optional extensions field which is a list of
> key=value pairs. We can add an extension key to the first client message to
> indicate delegation token. Broker can then obtain credentials and principal
> using a different code path for delegation tokens.
>
> On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:
>
> > Magnus,
> >
> > Thanks for the input. If you don't feel strongly the need to bump up the
> > version of SaslHandshake, we can leave the version unchanged.
> >
> > Rajini,
> >
> > 11. Yes, we could send the HMAC as the SCRAM password for the delegation
> > token. Do we need something to indicate that this SCRAM token is special
> > (i.e., delegation token) so that we can generate the correct
> > KafkaPrincipal? The delegation token logic can be added later. I am
> asking
> > just so that we have enough in the design of SCRAM to add the delegation
> > token logic later.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > Hi Jun,
> > >
> > > 10. *s=* and *i=* come from the SCRAM standard (they
> > are
> > > transferred during SCRAM auth). Scram messages look like (for example)
> > > *r=,s=,i=*. StoredKey and ServerKey and not
> > > transferred in SCRAM messages, so I picked two keys that are unused in
> > > SCRAM.
> > >
> > > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password for
> > > authentication along with a username and an optional authorization-id.
> > > Kafka uses the username as the identity (Kafka principal) for
> > > authentication and authorization. KIP-48 doesn't mention KafkaPrincipal
> > in
> > > the section "Authentication using Token", but a delegation token is
> > > associated with a Kafka principal. Since delegation tokens are acquired
> > on
> > > behalf of a KafkaPrincipal and the principal is included in the token
> as
> > > the token owner,  clients authenticating with delegation tokens could
> use
> > > the token owner as username and the token HMAC as shared
> secret/password.
> > >
> > > If necessary, any other form of token identifier may be used as
> username
> > as
> > > well as long as it contains sufficient information for the broker to
> > > retrieve/compute the principal and HMAC for authentication. The server
> > > callback handler can be updated when delegation tokens are implemented
> to
> > > generate Kafka principal accordingly.
> > >
> > >
> > > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
> > >
> > > > Hi, Rajini,
> > > >
> > > > A couple of other questions on the KIP.
> > > >
> > > > 10. For the config values stored in ZK, are those keys (s, t, k, i,
> > etc)
> > > > stored under scram-sha-256 standard?
> > > >
> > > > 11. Could KIP-48 (delegation token) use this KIP to send delegation
> > > tokens?
> > > > In KIP-48, the client sends a HMAC as the delegation token to the
> > server.
> > > > Not sure how this gets mapped to the username/password in this KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> > > > rajinisiva...@googlemail.com
> > > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256
> SASL
> > > > > mechanisms to Kafka:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 84%3A+Support+SASL+SCRAM+mechanisms
> > > > >
> > > > >
> > > > > Comments and suggestions are welcome.
> > > > >
> > > > > Thank you...
> > > > >
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


Jenkins build is back to normal : kafka-trunk-jdk8 #1026

2016-11-08 Thread Apache Jenkins Server
See 



Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Rajini Sivaram
Jun,

11. SCRAM messages have an optional extensions field which is a list of
key=value pairs. We can add an extension key to the first client message to
indicate delegation token. Broker can then obtain credentials and principal
using a different code path for delegation tokens.

On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao  wrote:

> Magnus,
>
> Thanks for the input. If you don't feel strongly the need to bump up the
> version of SaslHandshake, we can leave the version unchanged.
>
> Rajini,
>
> 11. Yes, we could send the HMAC as the SCRAM password for the delegation
> token. Do we need something to indicate that this SCRAM token is special
> (i.e., delegation token) so that we can generate the correct
> KafkaPrincipal? The delegation token logic can be added later. I am asking
> just so that we have enough in the design of SCRAM to add the delegation
> token logic later.
>
> Thanks,
>
> Jun
>
>
> On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com
> > wrote:
>
> > Hi Jun,
> >
> > 10. *s=* and *i=* come from the SCRAM standard (they
> are
> > transferred during SCRAM auth). Scram messages look like (for example)
> > *r=,s=,i=*. StoredKey and ServerKey and not
> > transferred in SCRAM messages, so I picked two keys that are unused in
> > SCRAM.
> >
> > 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password for
> > authentication along with a username and an optional authorization-id.
> > Kafka uses the username as the identity (Kafka principal) for
> > authentication and authorization. KIP-48 doesn't mention KafkaPrincipal
> in
> > the section "Authentication using Token", but a delegation token is
> > associated with a Kafka principal. Since delegation tokens are acquired
> on
> > behalf of a KafkaPrincipal and the principal is included in the token as
> > the token owner,  clients authenticating with delegation tokens could use
> > the token owner as username and the token HMAC as shared secret/password.
> >
> > If necessary, any other form of token identifier may be used as username
> as
> > well as long as it contains sufficient information for the broker to
> > retrieve/compute the principal and HMAC for authentication. The server
> > callback handler can be updated when delegation tokens are implemented to
> > generate Kafka principal accordingly.
> >
> >
> > On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
> >
> > > Hi, Rajini,
> > >
> > > A couple of other questions on the KIP.
> > >
> > > 10. For the config values stored in ZK, are those keys (s, t, k, i,
> etc)
> > > stored under scram-sha-256 standard?
> > >
> > > 11. Could KIP-48 (delegation token) use this KIP to send delegation
> > tokens?
> > > In KIP-48, the client sends a HMAC as the delegation token to the
> server.
> > > Not sure how this gets mapped to the username/password in this KIP.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com
> > > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256 SASL
> > > > mechanisms to Kafka:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 84%3A+Support+SASL+SCRAM+mechanisms
> > > >
> > > >
> > > > Comments and suggestions are welcome.
> > > >
> > > > Thank you...
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > >
> >
> >
> >
> > --
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini


[GitHub] kafka pull request #2112: MINOR: fix typos and incorrect docs

2016-11-08 Thread xvrl
GitHub user xvrl opened a pull request:

https://github.com/apache/kafka/pull/2112

MINOR: fix typos and incorrect docs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xvrl/kafka minor-doc-fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2112


commit 7e0401f5346ca95bdf149a3df421ac76c2c22041
Author: Xavier Léauté 
Date:   2016-11-08T19:13:18Z

MINOR: fix typos and incorrect docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread Nacho Solis
>From Roger's description:

5a- separate metadata field, built in serialization
5c- separate metadata field, custom serialization
5b- custom serialization (inside V)
5d- built in serialization (inside V)

I added 5d for completeness.

>From this perspective I would choose

5a > 5c > 5d > 5b

In all of these cases, I would like to make sure that the broker
(eventually) has the ability to deal with the headers.

- Custom serialization
I'm not in favor of custom serialization for the whole metadata/header
block. This is because I think that we will have multiple headers or
metadata blobs by different teams (either internal or external), with
different goals and different requirements. They will work independently of
each other. They will not try to coordinate a common format. The audit team
and the security team and the performance team and the monitoring team and
the application might not work of the same needs.  The one need that they
share is sending data along with the message.   To put all of their data
together, we will need a common header system.

Obviously the kafka team (the team in charge of running the kafka system,
say, the linkedin-kafka-team) can write a wrapper or a custom serializer
that somehow provides a set of functions for all these teams to work
together.  So technically we're not limited. However, if we want to share
our plugins with some other team, the acme-kafka-team then we would have to
have compatible serializers.  This is doable but not an easy task.

>From my point of view we have 2 options:
A- We use a built in serializer for the headers. Each plugin/module can
then serialize their internal data however they want, but the set format
itself is common.  This would allow us to work together. Plugins are shared
and evolve from collective efforts.
B1- We use a custom serializer for headers.  We have balkanization of
headers and no cooperation
B2- We use a custom serializer for headers.  One such serializer becomes
popular, effectively providing a wrapper to the open source clients that
provides header support. Various companies/entities start using this and
form a community around this. Plugins are shared and evolve from collective
efforts.

I believe that given B2 offers collective power, it will overtake B1.
Effectively, we would reach the same situation as A, but will take a little
more time and will make the code more difficult to manage.


Isn't this the same reason Connect is inside Apache Kafka?  And now there
are a set of Kafka Connectors (https://www.confluent.io/product/connectors/)
that take advantage of the fact that Connect defines a common framework.


To be clear, I think my main goal would be for Apache Kafka to offer a
Client API to add and remove headers per message.  If we can offer this as
a standard (in other words, part of Apache Kafka open source), then we have
achieved 80% of the work. The community will benefit as a whole.   If this
is done via a container inside V, if it's done natively in the protocol, if
we offer a way to override the serializer, if the broker can understand the
headers, etc. are secondary (though I obviously have opinions about those;
no, yes, no, yes).

If we don't want to include this into Apache open source, then the people
that want it will have to write their own (if they haven't done so
already). With time, they will end up writing a common wrapper, the common
wrapper will get shared plugins, people will start using the shared
plugins, the wrapper will become more popular than the regular clients and
eventually there will be a fork or a merge back.

Yes, it is possible not everybody wants headers, so far, we haven't met
many (any?) of those people. At most we've seen people that are happy with
heir own implementation or hack around the issue.  I'm pretty certain that
if they had had headers to start with they wouldn't be in the situation
they are today.

Even if the current people don't want to change from their current system;
the new people will probably use it. LinkedIn for certain would use it.

Make Kafka great again!  [1]


Nacho

[1] to be clear that's a joke... it's election day in the US


On Tue, Nov 8, 2016 at 9:48 AM, radai  wrote:

> both 5a and 5c would involve a wire format change, so any arguments about
> needing an upgrade path bumping protocol version etc apply equally to both.
> so the "cost" (in terms of impact of a wire format change) is the same.
>
> 5c, to me, means doing all the work (more exactly incurring all the cost)
> but getting very few of the benefits. a universal, agreed-upon structure
> for headers (specifically their keys) is, in my opinion, a basic
> requirement to reap the full benefits of headers - an active ecosystem of
> composable, re-usable, 3rd-party extensions to kafka.
>
> as for what exactly those keys are (int vs string) - since using ints is
> such a giant sticking point and given kafka usually operates with batching
> and compression and does not achieve 

Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Jun Rao
Magnus,

Thanks for the input. If you don't feel strongly the need to bump up the
version of SaslHandshake, we can leave the version unchanged.

Rajini,

11. Yes, we could send the HMAC as the SCRAM password for the delegation
token. Do we need something to indicate that this SCRAM token is special
(i.e., delegation token) so that we can generate the correct
KafkaPrincipal? The delegation token logic can be added later. I am asking
just so that we have enough in the design of SCRAM to add the delegation
token logic later.

Thanks,

Jun


On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram  wrote:

> Hi Jun,
>
> 10. *s=* and *i=* come from the SCRAM standard (they are
> transferred during SCRAM auth). Scram messages look like (for example)
> *r=,s=,i=*. StoredKey and ServerKey and not
> transferred in SCRAM messages, so I picked two keys that are unused in
> SCRAM.
>
> 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password for
> authentication along with a username and an optional authorization-id.
> Kafka uses the username as the identity (Kafka principal) for
> authentication and authorization. KIP-48 doesn't mention KafkaPrincipal in
> the section "Authentication using Token", but a delegation token is
> associated with a Kafka principal. Since delegation tokens are acquired on
> behalf of a KafkaPrincipal and the principal is included in the token as
> the token owner,  clients authenticating with delegation tokens could use
> the token owner as username and the token HMAC as shared secret/password.
>
> If necessary, any other form of token identifier may be used as username as
> well as long as it contains sufficient information for the broker to
> retrieve/compute the principal and HMAC for authentication. The server
> callback handler can be updated when delegation tokens are implemented to
> generate Kafka principal accordingly.
>
>
> On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > A couple of other questions on the KIP.
> >
> > 10. For the config values stored in ZK, are those keys (s, t, k, i, etc)
> > stored under scram-sha-256 standard?
> >
> > 11. Could KIP-48 (delegation token) use this KIP to send delegation
> tokens?
> > In KIP-48, the client sends a HMAC as the delegation token to the server.
> > Not sure how this gets mapped to the username/password in this KIP.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256 SASL
> > > mechanisms to Kafka:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 84%3A+Support+SASL+SCRAM+mechanisms
> > >
> > >
> > > Comments and suggestions are welcome.
> > >
> > > Thank you...
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-08 Thread radai
I've updated the KIP page to specify the new config would co-exist with
"queued.max.request" to minimize the impact on compatibility.

On Tue, Nov 8, 2016 at 7:02 AM, radai  wrote:

> My personal opinion on this is that control of memory was always the
> intent behind queued.max.requests and so this KIP could completely obsolete
> it.
> For now its probably safest to leave it as-is (making memory-bound
> "opt-in") and revisit this at a later date
>
> On Mon, Nov 7, 2016 at 2:32 PM, Gwen Shapira  wrote:
>
>> Hey Radai,
>>
>> Looking at the proposal, it looks like a major question is still
>> unresolved?
>> "This configuration parameter can either replace queued.max.requests
>> completely, or co-exist with it (by way of either-or or respecting
>> both bounds and not picking up new requests when either is hit)."
>>
>> On Mon, Nov 7, 2016 at 1:08 PM, radai  wrote:
>> > Hi,
>> >
>> > I would like to initiate a vote on KIP-72:
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+
>> Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>> >
>> > The kip allows specifying a limit on the amount of memory allocated for
>> > reading incoming requests into. This is useful for "sizing" a broker and
>> > avoiding OOMEs under heavy load (as actually happens occasionally at
>> > linkedin).
>> >
>> > I believe I've addressed most (all?) concerns brought up during the
>> > discussion.
>> >
>> > To the best of my understanding this vote is about the goal and
>> > public-facing changes related to the new proposed behavior, but as for
>> > implementation, i have the code up here:
>> >
>> > https://github.com/radai-rosenblatt/kafka/tree/broker-memory
>> -pool-with-muting
>> >
>> > and I've stress-tested it to work properly (meaning it chugs along and
>> > throttles under loads that would DOS 10.0.1.0 code).
>> >
>> > I also believe that the primitives and "pattern"s introduced in this KIP
>> > (namely the notion of a buffer pool and retrieving from / releasing to
>> said
>> > pool instead of allocating memory) are generally useful beyond the
>> scope of
>> > this KIP for both performance issues (allocating lots of short-lived
>> large
>> > buffers is a performance bottleneck) and other areas where memory limits
>> > are a problem (KIP-81)
>> >
>> > Thank you,
>> >
>> > Radai.
>>
>>
>>
>> --
>> Gwen Shapira
>> Product Manager | Confluent
>> 650.450.2760 | @gwenshap
>> Follow us: Twitter | blog
>>
>
>


[GitHub] kafka-site issue #29: Update the website repo link in code.html to point to ...

2016-11-08 Thread becketqin
Github user becketqin commented on the issue:

https://github.com/apache/kafka-site/pull/29
  
@ijuma Does the updated page look good to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4284) Partitioner never closed by producer

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15648262#comment-15648262
 ] 

ASF GitHub Bot commented on KAFKA-4284:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2000


> Partitioner never closed by producer
> 
>
> Key: KAFKA-4284
> URL: https://issues.apache.org/jira/browse/KAFKA-4284
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.0.1
>Reporter: Theo Hultberg
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> Partitioners are never closed by the producer, even though the Partitioner 
> interface has a close method.
> I looked at KAFKA-2091 and it seems like the close method has been there from 
> the beginning, but never been used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4284) Partitioner never closed by producer

2016-11-08 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-4284:
-
   Resolution: Fixed
Fix Version/s: 0.10.2.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2000
[https://github.com/apache/kafka/pull/2000]

> Partitioner never closed by producer
> 
>
> Key: KAFKA-4284
> URL: https://issues.apache.org/jira/browse/KAFKA-4284
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.0.1
>Reporter: Theo Hultberg
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> Partitioners are never closed by the producer, even though the Partitioner 
> interface has a close method.
> I looked at KAFKA-2091 and it seems like the close method has been there from 
> the beginning, but never been used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2000: KAFKA-4284: Make Partitioner a Closeable and close...

2016-11-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2000


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-08 Thread radai
both 5a and 5c would involve a wire format change, so any arguments about
needing an upgrade path bumping protocol version etc apply equally to both.
so the "cost" (in terms of impact of a wire format change) is the same.

5c, to me, means doing all the work (more exactly incurring all the cost)
but getting very few of the benefits. a universal, agreed-upon structure
for headers (specifically their keys) is, in my opinion, a basic
requirement to reap the full benefits of headers - an active ecosystem of
composable, re-usable, 3rd-party extensions to kafka.

as for what exactly those keys are (int vs string) - since using ints is
such a giant sticking point and given kafka usually operates with batching
and compression and does not achieve high-enough iops for it to make a
noticeable difference in CPU consumption I'm willing to go with string keys
just to get that out of the way.

On Mon, Nov 7, 2016 at 11:51 PM, Michael Pearce 
wrote:

> +1 on this slimmer version of our proposal
>
> I def think the Id space we can reduce from the proposed int32(4bytes)
> down to int16(2bytes) it saves on space and as headers we wouldn't expect
> the number of headers being used concurrently being that high.
>
> I would wonder if we should make the value byte array length still int32
> though as This is the standard Max array length in Java saying that it is a
> header and I guess limiting the size is sensible and would work for all the
> use cases we have in mind so happy with limiting this.
>
> Do people generally concur on Magnus's slimmer version? Anyone see any
> issues if we moved from int32 to int16?
>
> Re configurable ids per plugin over a global registry also would work for
> us.  As such if this has better concensus over the proposed global registry
> I'd be happy to change that.
>
> I was already sold on ints over strings for keys ;)
>
> Cheers
> Mike
>
> 
> From: Magnus Edenhill 
> Sent: Monday, November 7, 2016 10:10:21 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
>
> Hi,
>
> I'm +1 for adding generic message headers, but I do share the concerns
> previously aired on this thread and during the KIP meeting.
>
> So let me propose a slimmer alternative that does not require any sort of
> global header registry, does not affect broker performance or operations,
> and adds as little overhead as possible.
>
>
> Message
> 
> The protocol Message type is extended with a Headers array consting of
> Tags, where a Tag is defined as:
>int16 Id
>int16 Len  // binary_data length
>binary_data[Len]  // opaque binary data
>
>
> Ids
> ---
> The Id space is not centrally managed, so whenever an application needs to
> add headers, or use an eco-system plugin that does, its Id allocation will
> need to be manually configured.
> This moves the allocation concern from the global space down to
> organization level and avoids the risk for id conflicts.
> Example pseudo-config for some app:
> sometrackerplugin.tag.sourcev3.id=1000
> dbthing.tag.tablename.id=1001
> myschemareg.tag.schemaname.id=1002
> myschemareg.tag.schemaversion.id=1003
>
>
> Each header-writing or header-reading plugin must provide means (typically
> through configuration) to specify the tag for each header it uses. Defaults
> should be avoided.
> A consumer silently ignores tags it does not have a mapping for (since the
> binary_data can't be parsed without knowing what it is).
>
> Id range 0..999 is reserved for future use by the broker and must not be
> used by plugins.
>
>
>
> Broker
> -
> The broker does not process the tags (other than the standard protocol
> syntax verification), it simply stores and forwards them as opaque data.
>
> Standard message translation (removal of Headers) kicks in for older
> clients.
>
>
> Why not string ids?
> -
> String ids might seem like a good idea, but:
>  * does not really solve uniqueness
>  * consumes a lot of space (2 byte string length + string, per header) to
> be meaningful
>  * doesn't really say anything how to parse the tag's data, so it is in
> effect useless on its own.
>
>
> Regards,
> Magnus
>
>
>
>
> 2016-11-07 18:32 GMT+01:00 Michael Pearce :
>
> > Hi Roger,
> >
> > Thanks for the support.
> >
> > I think the key thing is to have a common key space to make an ecosystem,
> > there does have to be some level of contract for people to play nicely.
> >
> > Having map or as per current proposed in kip of having a
> > numerical key space of  map is a level of the contract that
> > most people would expect.
> >
> > I think the example in a previous comment someone else made linking to
> AWS
> > blog and also implemented api where originally they didn’t have a header
> > space but not they do, where keys are uniform but the value can be
> string,
> > int, anything 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-08 Thread Mayuresh Gharat
I think the migration can be done in 2 stages :

1) In first stage the broker should understand the attribute flag as well
as Null for the value for log compaction.
2) In second stage we move on to supporting only the attribute flag for log
compaction.

I agree with Becket that for older clients (consumers) the broker might
have to down convert a message that has the attribute flag set for log
compacting but has a non null value. But this should be in first stage.
Once all the clients have upgraded (clients start recognizing the attribute
flag), we can move the broker to stage 2.

Thanks,

Mayuresh

On Tue, Nov 8, 2016 at 12:06 AM, Michael Pearce 
wrote:

> Also we can add further guidance:
>
> To  avoid the below caveat to organisations by promoting of upgrading all
> consumers first before relying on producing tombstone messages with data
>
> Sent using OWA for iPhone
> 
> From: Michael Pearce
> Sent: Tuesday, November 8, 2016 8:03:32 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> Thanks Jun on the feedback, I think I understand the issue/point now.
>
> We def can add that on older client version if tombstone marker make the
> value null to preserve behaviour.
>
> There is one caveats to this:
>
> * we have to be clear that data is lost if reading via old client/message
> format - I don't think this is a big issue as mostly the idea/use case is
> around meta data transport as such would only be as bad as current situation
>
> Re having configurable broker this was to handle cases like you described
> but in another way by allowing organisation choose the behaviour of the
> compaction per broker or per topic so they could manage their transition to
> using tombstone markers.
>
> On hind sight it maybe easier to just upgrade and downgrade the messages
> on version as you propose.
>
>
>
>
>
>
> Sent using OWA for iPhone
> 
> From: Jun Rao 
> Sent: Tuesday, November 8, 2016 12:34:41 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag
>
> For the use case, one potential use case is for schema registration. For
> example, in Avro, a null value corresponds to a Null schema. So, if you
> want to be able to keep the schema id in a delete message, the value can't
> be null. We could get around this issue by specializing null value during
> schema registration though.
>
> Now for the proposed changes. We probably should preserve client
> compatibility. If a client application is sending a null value to a
> compacted topic, ideally, it should work the same after the client
> upgrades.
>
> I am not sure about making the tombstone marker configurable, especially at
> the topic level. Should we allow users to change the config values back and
> forth, and what would be the implication?
>
> Thanks,
>
> Jun
>
> On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin  wrote:
>
> > Hi Michael,
> >
> > Yes, changing the logic in the log cleaner makes sense. There could be
> some
> > other thing worth thinking (e.g. the message size change after
> conversion),
> > though.
> >
> > The scenario I was thinking is the following:
> > Imagine a distributed caching system built on top of Kafka. A user is
> > consuming from a topic and it is guaranteed that if the user consume to
> the
> > log end it will get the latest value for all the keys. Currently if the
> > consumer sees a null value it knows the key has been removed. Now let's
> say
> > we rolled out this change. And the producer applies a message with the
> > tombstone flag set, but the value was not null. When we append that
> message
> > to the log I suppose we will not do the down conversion if the broker has
> > set the message.format.version to the latest. Because the log cleaner
> won't
> > touch the active log segment, so that message will be sitting in the
> active
> > segment as is. Now when a consumer that hasn't upgraded yet consumes that
> > tombstone message in the active segment, it seems that the broker will
> need
> > to down convert that message to remove the value, right? In this case, we
> > cannot wait for the log cleaner to do the down conversion because that
> > message may have already been consumed before the log compaction happens.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Nov 7, 2016 at 9:59 AM, Michael Pearce 
> > wrote:
> >
> > > Hi Becket,
> > >
> > > We were thinking more about having the logic that’s in the method
> > > shouldRetainMessage configurable via http://kafka.apache.org/
> > > documentation.html#brokerconfigs  at a broker/topic level. And then
> > scrap
> > > auto converting the message, and allow organisations to manage the
> > rollout
> > > of enabling of the feature.
> > > (this isn’t in documentation but in response to the discussion thread
> as
> > > an 

[jira] [Created] (KAFKA-4390) Replace MessageSet usage with client-side equivalents

2016-11-08 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-4390:
--

 Summary: Replace MessageSet usage with client-side equivalents
 Key: KAFKA-4390
 URL: https://issues.apache.org/jira/browse/KAFKA-4390
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Currently we have two separate implementations of Kafka's message format and 
log structure, one on the client side and one on the server side. Once 
KAFKA-2066 is merged, we will only be using the client side objects for direct 
serialization/deserialization in the request APIs, but we we still be using the 
server-side MessageSet objects everywhere else. Ideally, we can update this 
code to use the client objects everywhere so that future message format changes 
only need to be made in one place. This would eliminate the potential for 
implementation differences and gives us a uniform API for accessing the 
low-level log structure.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4353) Add semantic types to Kafka Connect

2016-11-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647999#comment-15647999
 ] 

Randall Hauch commented on KAFKA-4353:
--

Logical types and semantic types are not the same thing, and they don't carry 
the same weight. The point of semantic types is not so much that every 
programming language has constructs for them, but rather that a *source* 
accessed by a connector has this concept and wants to capture it. Whether or 
not consumers choose or will do anything with this extra semantic information 
is beside the point, because as soon as its available then consumers *can* do 
something with it. In this way, semantic types are very different than logical 
types that build into the converters the conversion logic to and from 
programming language constructs.

Sure, source connector can define their own semantic type by simply creating a 
schema based upon a primitive and giving it a name. Debezium is doing precisely 
this for JSON, XML, UUIDs, and temporal types so that its source connectors can 
include as much information as possible about the data captured in the event 
messages. The problem with this is that sink connectors written by other 
communities or organizations are not likely to know about Debezium's semantic 
types. The bottom line is that having some standard semantic types will mean 
that more connectors are developed to support them, and that people can much 
more easily mix and match source and sink connectors.

JSON is an excellent example. Source connectors can capture that {{STRING}} 
fields are in fact JSON documents, arrays, or scalars, and sink connectors 
pushing data into systems that *do* have some notion of JSON could take the 
{{STRING}} values and parse them into JSON representation before using them. I 
conceded that it's maybe not useful to have lots of similar temporal semantic 
types with different units, but at a minimum I do think it is useful to have 
semantic types for year, days, and ISO 8601 timestamps. 

Really, semantic types are just a convention of using the existing schema 
system but with well-known schema names. Perhaps it's less useful for Kafka 
Connect software to define the few constants and trivial utility methods, and 
more useful to treat it as a protocol that multiple organizations can 
collaborate on and support.

> Add semantic types to Kafka Connect
> ---
>
> Key: KAFKA-4353
> URL: https://issues.apache.org/jira/browse/KAFKA-4353
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Kafka Connect's schema system defines several _core types_ that consist of:
> * STRUCT
> * ARRAY
> * MAP
> plus these _primitive types_:
> * INT8
> * INT16
> * INT32
> * INT64
> * FLOAT32
> * FLOAT64
> * BOOLEAN
> * STRING
> * BYTES
> The {{Schema}} for these core types define several attributes, but they do 
> not have a name.
> Kafka Connect also defines several _logical types_ that are specializations 
> of the primitive types and _do_ have schema names _and_ are automatically 
> mapped to/from Java objects:
> || Schema Name || Primitive Type || Java value class || Description ||
> | o.k.c.d.Decimal | {{BYTES}} | {{java.math.BigDecimal}} | An 
> arbitrary-precision signed decimal number. |
> | o.k.c.d.Date | {{INT32}} | {{java.util.Date}} | A date representing a 
> calendar day with no time of day or timezone. The {{java.util.Date}} value's 
> hours, minutes, seconds, milliseconds are set to 0. The underlying 
> representation is an integer representing the number of standardized days 
> (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 
> seconds/minute, 1000 milliseconds/second with n) since Unix epoch. |
> | o.k.c.d.Time | {{INT32}} | {{java.util.Date}} | A time representing a 
> specific point in a day, not tied to any specific date. Only the 
> {{java.util.Date}} value's hours, minutes, seconds, and milliseconds can be 
> non-zero. This effectively makes it a point in time during the first day 
> after the Unix epoch. The underlying representation is an integer 
> representing the number of milliseconds after midnight. |
> | o.k.c.d.Timestamp | {{INT32}} | {{java.util.Date}} | A timestamp 
> representing an absolute time, without timezone information. The underlying 
> representation is a long representing the number of milliseconds since Unix 
> epoch. |
> where "o.k.c.d" is short for {{org.kafka.connect.data}}. [~ewencp] has stated 
> in the past that adding more logical types is challenging and generally 
> undesirable, since everyone use Kafka Connect values have to deal with all 
> new logical types.
> This proposal adds standard _semantic_ types that are somewhere between the 
> core types and 

Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Magnus Edenhill
Hey,

I'm probably going to argue against bumping the SaslHandshake version, it
is redundant to the existing SaslHandshakeResponse.enabled_mechanisms field.


With SaslHandshake version bump:
 * Client performs ApiVersionRequest
 * If SaslHandshake >= v1:   use SCRAM
 * If SaslHandshake < v1:  use some fallback mechanism (PLAIN, GSSAPI, ..)
 * Send SaslHandshakeRequest with selected mechanism
 * Broker returns Ok, or error if the mechanism is unsupported or disabled.
 * On error, use some fallback mechanism. [*]

Without SaslHandshake version bump:
 * Client sends SaslHandshake mechanism=SCRAM
 * Broker returns Ok, or error if the mechanism is unsupported or disabled.
 * On error, use some fallback mechanism [*]

This goes to show that in both scenarios the client will need some logic to
handle an unsupported/disabled mechanism,
so bumping the SaslHandshake version doesn't really help much (unless the
API version reflects the configured authenticator).

[*]: I dont think any client implements auth mechanism fallback and I have
yet to see a real-world use-case for it,
so if the mechanism configured on the client isnt available on the broker
that is typically a terminal error.

And for error reporting to the user:
all it needs to say is that the client's configured mechanism is not
available on the broker, but these enabled_mechanisms are.

My two cents,
Magnus

2016-11-08 13:42 GMT+01:00 Rajini Sivaram :

> Hi Jun,
>
> 10. *s=* and *i=* come from the SCRAM standard (they are
> transferred during SCRAM auth). Scram messages look like (for example)
> *r=,s=,i=*. StoredKey and ServerKey and not
> transferred in SCRAM messages, so I picked two keys that are unused in
> SCRAM.
>
> 11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password for
> authentication along with a username and an optional authorization-id.
> Kafka uses the username as the identity (Kafka principal) for
> authentication and authorization. KIP-48 doesn't mention KafkaPrincipal in
> the section "Authentication using Token", but a delegation token is
> associated with a Kafka principal. Since delegation tokens are acquired on
> behalf of a KafkaPrincipal and the principal is included in the token as
> the token owner,  clients authenticating with delegation tokens could use
> the token owner as username and the token HMAC as shared secret/password.
>
> If necessary, any other form of token identifier may be used as username as
> well as long as it contains sufficient information for the broker to
> retrieve/compute the principal and HMAC for authentication. The server
> callback handler can be updated when delegation tokens are implemented to
> generate Kafka principal accordingly.
>
>
> On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > A couple of other questions on the KIP.
> >
> > 10. For the config values stored in ZK, are those keys (s, t, k, i, etc)
> > stored under scram-sha-256 standard?
> >
> > 11. Could KIP-48 (delegation token) use this KIP to send delegation
> tokens?
> > In KIP-48, the client sends a HMAC as the delegation token to the server.
> > Not sure how this gets mapped to the username/password in this KIP.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com
> > > wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256 SASL
> > > mechanisms to Kafka:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 84%3A+Support+SASL+SCRAM+mechanisms
> > >
> > >
> > > Comments and suggestions are welcome.
> > >
> > > Thank you...
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647817#comment-15647817
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647816#comment-15647816
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-08 Thread markcshelton
GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-08 Thread markcshelton
Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-08 Thread radai
My personal opinion on this is that control of memory was always the intent
behind queued.max.requests and so this KIP could completely obsolete it.
For now its probably safest to leave it as-is (making memory-bound
"opt-in") and revisit this at a later date

On Mon, Nov 7, 2016 at 2:32 PM, Gwen Shapira  wrote:

> Hey Radai,
>
> Looking at the proposal, it looks like a major question is still
> unresolved?
> "This configuration parameter can either replace queued.max.requests
> completely, or co-exist with it (by way of either-or or respecting
> both bounds and not picking up new requests when either is hit)."
>
> On Mon, Nov 7, 2016 at 1:08 PM, radai  wrote:
> > Hi,
> >
> > I would like to initiate a vote on KIP-72:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
> >
> > The kip allows specifying a limit on the amount of memory allocated for
> > reading incoming requests into. This is useful for "sizing" a broker and
> > avoiding OOMEs under heavy load (as actually happens occasionally at
> > linkedin).
> >
> > I believe I've addressed most (all?) concerns brought up during the
> > discussion.
> >
> > To the best of my understanding this vote is about the goal and
> > public-facing changes related to the new proposed behavior, but as for
> > implementation, i have the code up here:
> >
> > https://github.com/radai-rosenblatt/kafka/tree/broker-
> memory-pool-with-muting
> >
> > and I've stress-tested it to work properly (meaning it chugs along and
> > throttles under loads that would DOS 10.0.1.0 code).
> >
> > I also believe that the primitives and "pattern"s introduced in this KIP
> > (namely the notion of a buffer pool and retrieving from / releasing to
> said
> > pool instead of allocating memory) are generally useful beyond the scope
> of
> > this KIP for both performance issues (allocating lots of short-lived
> large
> > buffers is a performance bottleneck) and other areas where memory limits
> > are a problem (KIP-81)
> >
> > Thank you,
> >
> > Radai.
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15647603#comment-15647603
 ] 

Ismael Juma commented on KAFKA-4362:


This looks like a duplicate of KAFKA-4268. Since this JIRA has more 
information, maybe we can close the other one in favour of this one.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-08 Thread Rajini Sivaram
Hi Jun,

10. *s=* and *i=* come from the SCRAM standard (they are
transferred during SCRAM auth). Scram messages look like (for example)
*r=,s=,i=*. StoredKey and ServerKey and not
transferred in SCRAM messages, so I picked two keys that are unused in
SCRAM.

11. SCRAM (like DIGEST-MD5 or PLAIN) uses a shared secret/password for
authentication along with a username and an optional authorization-id.
Kafka uses the username as the identity (Kafka principal) for
authentication and authorization. KIP-48 doesn't mention KafkaPrincipal in
the section "Authentication using Token", but a delegation token is
associated with a Kafka principal. Since delegation tokens are acquired on
behalf of a KafkaPrincipal and the principal is included in the token as
the token owner,  clients authenticating with delegation tokens could use
the token owner as username and the token HMAC as shared secret/password.

If necessary, any other form of token identifier may be used as username as
well as long as it contains sufficient information for the broker to
retrieve/compute the principal and HMAC for authentication. The server
callback handler can be updated when delegation tokens are implemented to
generate Kafka principal accordingly.


On Tue, Nov 8, 2016 at 1:03 AM, Jun Rao  wrote:

> Hi, Rajini,
>
> A couple of other questions on the KIP.
>
> 10. For the config values stored in ZK, are those keys (s, t, k, i, etc)
> stored under scram-sha-256 standard?
>
> 11. Could KIP-48 (delegation token) use this KIP to send delegation tokens?
> In KIP-48, the client sends a HMAC as the delegation token to the server.
> Not sure how this gets mapped to the username/password in this KIP.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com
> > wrote:
>
> > Hi all,
> >
> > I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256 SASL
> > mechanisms to Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 84%3A+Support+SASL+SCRAM+mechanisms
> >
> >
> > Comments and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>



-- 
Regards,

Rajini


RE: Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-08 Thread david.franklin
Hi Ewen,

Thanks for the additional insight.

Because I have no Connect schema (only an Avro schema) am I safe to just use 
the byte[] <-> (Avro Schema, SpecificRecord) conversion?  This seems to work 
with the, admittedly limited, testing I've done so far.  Without converting my 
Avro schema into a Connect schema I don't see that I have any other option.

I'm a bit puzzled and concerned about the Kafka null values.  Am I unwittingly 
generating them or do they arise from the Kafka framework?  You mentioned 
compaction but I thought that was an purely internal operation on the commit 
log that didn't give rise to events.  I've never encountered null events in my 
previous experience with Kafka.

Regards,
David

-Original Message-
From: Ewen Cheslack-Postava [mailto:e...@confluent.io] 
Sent: 08 November 2016 04:38
To: dev@kafka.apache.org
Subject: Re: Kafka Connect key.converter and value.converter properties for 
Avro encoding

On Mon, Nov 7, 2016 at 7:14 AM,  wrote:

> Hi Ewen,
>
> Sorry but I didn't understand much of that.
>
> I currently have an implementation of the Converter interface that 
> uses Avro's BinaryEncoder/Decoder, SpecificDatumReader/Writer.
>
> The main mismatch I faced is that I need to use org.apache.avro.Schema 
> for serialization whereas the Converter interface requires a 
> org.apache.kafka.connect.data.Schema schema.
>

This was the main point I was trying to get at. The schemas that are used in 
the Converter interface (parameter to fromConnectData, return value of
toConnectData) are Connect schemas. The values that go with those schemas are 
*not* in a serialization-specific runtime format, i.e. you cannot just use 
SpecificRecords. Instead they are in the format of the Connect data API. The 
equivalent of Generic/SpecificRecord would be Struct ( 
http://docs.confluent.io/3.0.1/connect/javadocs/index.html?org/apache/kafka/connect/data/Struct.html
).

Although the only requirement of the Converter interface is that you
convert:

byte[] <-> (Connect Schema, Connect data value)

in practice many Converters will be decomposed into the following:

byte[] <-> (serialization format specific schema, serialization format specific 
runtime data) <-> (Connect Schema, Connect data value)

i.e. in the case of Avro

byte[] <-> (Avro Schema, SpecificRecord) <-> (Connect Schema, Connect data
value)



>
> In the absence of a transformer to interconvert between these Schema 
> representations (are any available?) I have, for now, gone for the 
> slightly fragile approach of inferring the schema from the topic name 
> (we currently have a topic per event type).  This means I ignore the 
> schema parameter in fromConnectData and return a null schema in toConnectData.
>

Our AvroConverter has a class that does conversion of Avro <-> Connect:
https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
This is combined with our normal Kafka de/serializers to perform the complete 
conversion. Note, however, that this API is internal and not guaranteed to be 
stable.


>
> With this I can create a simple Kafka consumer that correctly reads 
> these binary Avro encoded events generated by my Kafka Connect source, 
> once I've set the Kafka value.deserializer property to my serializer 
> class which implements Deserializer, which in turn 
> (re)uses my Kafka Connect converter class internally.
>
> However, I've noticed something odd: the fromConnectData  invocations 
> come in 2 forms:
>
> 1. schema = null, record = null
> 2. schema = Schema{BYTES}, record = a JSON structure


> Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as 
> the 4th arg to the SourceRecord ctr.
>
> Any idea why form 1 occurs?
>

Form 1 isn't even the only form that can occur. Connect actually supports 
schemaless data as well, where you would see (schema=null, record=). (Though you will never see Structs since they require a schema; 
however complex data can be represented as maps & lists.)

However, for that particular case you are almost definitely seeing this from 
converting a null value in Kafka. If the value in Kafka is null, there's no way 
to know what the intended schema was, so it has to be left blank. However, 
since null is used in compacted topics for deletion, it is important to 
actually translate null values in Connect to be true nulls in Kafka.


>
> Thanks again,
> David
>
>
>
>
>
>
>
> -Original Message-
> From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> Sent: 07 November 2016 04:35
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter 
> properties for Avro encoding
>
> You won't be accepting/returning SpecificRecords directly when working 
> with Connect's API. Connect intentionally uses an interface that is 
> different from Kafka serializers because we deal with structured data 
> that the connectors need to be able to understand. We 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-08 Thread Michael Pearce
Also we can add further guidance:

To  avoid the below caveat to organisations by promoting of upgrading all 
consumers first before relying on producing tombstone messages with data

Sent using OWA for iPhone

From: Michael Pearce
Sent: Tuesday, November 8, 2016 8:03:32 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

Thanks Jun on the feedback, I think I understand the issue/point now.

We def can add that on older client version if tombstone marker make the value 
null to preserve behaviour.

There is one caveats to this:

* we have to be clear that data is lost if reading via old client/message 
format - I don't think this is a big issue as mostly the idea/use case is 
around meta data transport as such would only be as bad as current situation

Re having configurable broker this was to handle cases like you described but 
in another way by allowing organisation choose the behaviour of the compaction 
per broker or per topic so they could manage their transition to using 
tombstone markers.

On hind sight it maybe easier to just upgrade and downgrade the messages on 
version as you propose.






Sent using OWA for iPhone

From: Jun Rao 
Sent: Tuesday, November 8, 2016 12:34:41 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

For the use case, one potential use case is for schema registration. For
example, in Avro, a null value corresponds to a Null schema. So, if you
want to be able to keep the schema id in a delete message, the value can't
be null. We could get around this issue by specializing null value during
schema registration though.

Now for the proposed changes. We probably should preserve client
compatibility. If a client application is sending a null value to a
compacted topic, ideally, it should work the same after the client upgrades.

I am not sure about making the tombstone marker configurable, especially at
the topic level. Should we allow users to change the config values back and
forth, and what would be the implication?

Thanks,

Jun

On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin  wrote:

> Hi Michael,
>
> Yes, changing the logic in the log cleaner makes sense. There could be some
> other thing worth thinking (e.g. the message size change after conversion),
> though.
>
> The scenario I was thinking is the following:
> Imagine a distributed caching system built on top of Kafka. A user is
> consuming from a topic and it is guaranteed that if the user consume to the
> log end it will get the latest value for all the keys. Currently if the
> consumer sees a null value it knows the key has been removed. Now let's say
> we rolled out this change. And the producer applies a message with the
> tombstone flag set, but the value was not null. When we append that message
> to the log I suppose we will not do the down conversion if the broker has
> set the message.format.version to the latest. Because the log cleaner won't
> touch the active log segment, so that message will be sitting in the active
> segment as is. Now when a consumer that hasn't upgraded yet consumes that
> tombstone message in the active segment, it seems that the broker will need
> to down convert that message to remove the value, right? In this case, we
> cannot wait for the log cleaner to do the down conversion because that
> message may have already been consumed before the log compaction happens.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 7, 2016 at 9:59 AM, Michael Pearce 
> wrote:
>
> > Hi Becket,
> >
> > We were thinking more about having the logic that’s in the method
> > shouldRetainMessage configurable via http://kafka.apache.org/
> > documentation.html#brokerconfigs  at a broker/topic level. And then
> scrap
> > auto converting the message, and allow organisations to manage the
> rollout
> > of enabling of the feature.
> > (this isn’t in documentation but in response to the discussion thread as
> > an alternative approach to roll out the feature)
> >
> > Does this make any more sense?
> >
> > Thanks
> > Mike
> >
> > On 11/3/16, 2:27 PM, "Becket Qin"  wrote:
> >
> > Hi Michael,
> >
> > Do you mean using a new configuration it is just the exiting
> > message.format.version config? It seems the message.format.version
> > config
> > is enough in this case. And the default value would always be the
> > latest
> > version.
> >
> > > Message version migration would be handled as like in KIP-32
> >
> > Also just want to confirm on this. Today if an old consumer consumes
> a
> > log
> > compacted topic and sees an empty value, it knows that is a
> tombstone.
> > After we start to use the attribute bit, a tombstone message can
> have a
> > non-empty value. So by "like in KIP-32" you mean we will remove the
> > value
> > 

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-08 Thread Michael Pearce
Thanks Jun on the feedback, I think I understand the issue/point now.

We def can add that on older client version if tombstone marker make the value 
null to preserve behaviour.

There is one caveats to this:

* we have to be clear that data is lost if reading via old client/message 
format - I don't think this is a big issue as mostly the idea/use case is 
around meta data transport as such would only be as bad as current situation

Re having configurable broker this was to handle cases like you described but 
in another way by allowing organisation choose the behaviour of the compaction 
per broker or per topic so they could manage their transition to using 
tombstone markers.

On hind sight it maybe easier to just upgrade and downgrade the messages on 
version as you propose.






Sent using OWA for iPhone

From: Jun Rao 
Sent: Tuesday, November 8, 2016 12:34:41 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

For the use case, one potential use case is for schema registration. For
example, in Avro, a null value corresponds to a Null schema. So, if you
want to be able to keep the schema id in a delete message, the value can't
be null. We could get around this issue by specializing null value during
schema registration though.

Now for the proposed changes. We probably should preserve client
compatibility. If a client application is sending a null value to a
compacted topic, ideally, it should work the same after the client upgrades.

I am not sure about making the tombstone marker configurable, especially at
the topic level. Should we allow users to change the config values back and
forth, and what would be the implication?

Thanks,

Jun

On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin  wrote:

> Hi Michael,
>
> Yes, changing the logic in the log cleaner makes sense. There could be some
> other thing worth thinking (e.g. the message size change after conversion),
> though.
>
> The scenario I was thinking is the following:
> Imagine a distributed caching system built on top of Kafka. A user is
> consuming from a topic and it is guaranteed that if the user consume to the
> log end it will get the latest value for all the keys. Currently if the
> consumer sees a null value it knows the key has been removed. Now let's say
> we rolled out this change. And the producer applies a message with the
> tombstone flag set, but the value was not null. When we append that message
> to the log I suppose we will not do the down conversion if the broker has
> set the message.format.version to the latest. Because the log cleaner won't
> touch the active log segment, so that message will be sitting in the active
> segment as is. Now when a consumer that hasn't upgraded yet consumes that
> tombstone message in the active segment, it seems that the broker will need
> to down convert that message to remove the value, right? In this case, we
> cannot wait for the log cleaner to do the down conversion because that
> message may have already been consumed before the log compaction happens.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 7, 2016 at 9:59 AM, Michael Pearce 
> wrote:
>
> > Hi Becket,
> >
> > We were thinking more about having the logic that’s in the method
> > shouldRetainMessage configurable via http://kafka.apache.org/
> > documentation.html#brokerconfigs  at a broker/topic level. And then
> scrap
> > auto converting the message, and allow organisations to manage the
> rollout
> > of enabling of the feature.
> > (this isn’t in documentation but in response to the discussion thread as
> > an alternative approach to roll out the feature)
> >
> > Does this make any more sense?
> >
> > Thanks
> > Mike
> >
> > On 11/3/16, 2:27 PM, "Becket Qin"  wrote:
> >
> > Hi Michael,
> >
> > Do you mean using a new configuration it is just the exiting
> > message.format.version config? It seems the message.format.version
> > config
> > is enough in this case. And the default value would always be the
> > latest
> > version.
> >
> > > Message version migration would be handled as like in KIP-32
> >
> > Also just want to confirm on this. Today if an old consumer consumes
> a
> > log
> > compacted topic and sees an empty value, it knows that is a
> tombstone.
> > After we start to use the attribute bit, a tombstone message can
> have a
> > non-empty value. So by "like in KIP-32" you mean we will remove the
> > value
> > to down convert the message if the consumer version is old, right?
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Nov 2, 2016 at 1:37 AM, Michael Pearce <
> michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Joel , et al.
> > >
> > > Any comments on the below idea to handle roll out / compatibility
> of
> > this
> > > feature, using a configuration?
> >