Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-23 Thread Dong Lin
Hey Jun,

I think there is one simpler design that doesn't need to add "create" flag
in LeaderAndIsrRequest and also remove the need for controller to
track/update which replicas are created. The idea is for each broker to
persist the created replicas in per-broker-per-topic znode. When a replica
is created or deleted, the broker updates the znode accordingly. When
broker receives LeaderAndIsrRequest, it learns the "create" flag from its
cache of these znode data. When a broker starts, it does need to read # of
znode proportional to the number of topics on its disks. But controller
still needs to learn about offline replicas from LeaderAndIsrResponse.

I think this is better than the current design. Do you have any concern
with this design?

Thanks,
Dong


On Thu, Feb 23, 2017 at 7:12 PM, Dong Lin  wrote:

> Hey Jun,
>
> Sure, here is my explanation.
>
> Design B would not work if it doesn't store created replicas in the ZK.
> For example, say broker B is health when it is shutdown. At this moment no
> offline replica is written in ZK for this broker. Suppose log directory is
> damaged when broker is offline, then when this broker starts, it won't know
> which replicas are in the bad log directory. And it won't be able to
> specify those offline replicas in /failed-log-directory either.
>
> Let's say design B stores created replica in ZK. Then the next problem is
> that, in the scenario that multiple log directories are damaged while
> broker is offline, when broker starts, it won't be able to know the exact
> list of offline replicas on each bad log directory. All it knows is the
> offline replicas on all those bad log directories. Thus it is impossible
> for broker to specify offline replicas per log directory in this scenario.
>
> I agree with your observation that, if admin fixes replaces dir1 with a
> good empty disk but leave dir2 untouched, design A won't create replica
> whereas design B can create it. But I am not sure that is a problem which
> we want to optimize. It seems reasonable for admin to fix both log
> directories in practice. If admin fixes only one of the two log
> directories, we can say it is a partial fix and Kafka won't re-create any
> offline replicas on dir1 and dir2. Similar to extra round of
> LeaderAndIsrRequest in case of log failure, I think this is also a pretty
> minor issue with design B.
>
> Thanks,
> Dong
>
>
> On Thu, Feb 23, 2017 at 6:46 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> My replies are inlined below.
>>
>> On Thu, Feb 23, 2017 at 4:47 PM, Dong Lin  wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks for you reply! Let me first comment on the things that you
>> listed as
>> > advantage of B over A.
>> >
>> > 1) No change in LeaderAndIsrRequest protocol.
>> >
>> > I agree with this.
>> >
>> > 2) Step 1. One less round of LeaderAndIsrRequest and no additional ZK
>> > writes to record the created flag.
>> >
>> > I don't think this is true. There will be one round of
>> LeaderAndIsrRequest
>> > in both A and B. In the design A controller needs to write to ZK once to
>> > record this replica as created. The design B the broker needs to write
>> > zookeeper once to record this replica as created. So there is same
>> number
>> > of LeaderAndIsrRequest and ZK writes.
>> >
>> > Broker needs to record created replica in design B so that when it
>> > bootstraps with failed log directory, the broker can derive the offline
>> > replicas as the difference between created replicas and replicas found
>> on
>> > good log directories.
>> >
>> >
>> Design B actually doesn't write created replicas in ZK. When a broker
>> starts up, all offline replicas are stored in the /failed-log-directory
>> path in ZK. So if a replica is not there and is not in the live log
>> directories either, it's never created. Does this work?
>>
>>
>>
>> > 3) Step 2. One less round of LeaderAndIsrRequest and no additional
>> logic to
>> > handle LeaderAndIsrResponse.
>> >
>> > While I agree there is one less round of LeaderAndIsrRequest in design
>> B, I
>> > don't think one additional LeaderAndIsrRequest to handle log directory
>> > failure is a big deal given that it doesn't happen frequently.
>> >
>> > Also, while there is no additional logic to handle LeaderAndIsrResponse
>> in
>> > design B, I actually think this is something that controller should do
>> > anyway. Say the broker stops responding to any requests without removing
>> > itself from zookeeper, the only way for controller to realize this and
>> > re-elect leader is to send request to this broker and handle response.
>> The
>> > is a problem that we don't do it as of now.
>> >
>> > 4) Step 6. Additional ZK reads proportional to # of failed log
>> directories,
>> > instead of # of partitions.
>> >
>> > If one znode is able to describe all topic partitions in a log
>> directory,
>> > then the existing znode /brokers/topics/[topic] should be able to
>> describe
>> > created replicas in addition to the assigned replicas for every
>> partition
>> > of the 

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-02-23 Thread Jaikiran Pai
James, thank you very much for this explanation and I now understand the 
situation much more clearly. I wasn't aware that the consumer's 
metadata.max.age.ms could play a role in this. I was under the 
impression that the 5 minute timeout is some broker level config which 
was triggering this consumer group reevaluation.


Perhaps changing the metadata.max.age.ms might be what we will end up 
doing, but before doing that, I will check how the 
consumer.partitionsFor API (the one which you noted in your reply) 
behaves in practice. The javadoc on that method states that it will 
trigger a refresh of the metadata if none is found for the topic. 
There's also a note that it throws a TimeoutException if the metadata 
isn't available for the topic after a timeout period. I'll experiment a 
bit with this API to see if I can be assured of a empty list if the 
topic metadata (after a fetch) isn't available. That way, I can add some 
logic in our application which calls this API for a certain number of 
fixed times, before the consumer is considered "ready". If it does throw 
a TimeoutException, I think I'll just catch it and repeat the call for 
the fixed number of times.


Thanks again, this was really helpful - I was running out of ideas to 
come up with a clean enough workaround to this problem. Your explanation 
has given me a couple of ideas which I consider clean enough to try a 
few things. Once I have consistent working solution for this, I'll send 
a note on what approach I settled on.



-Jaikiran

On Friday 24 February 2017 12:08 PM, James Cheng wrote:

On Feb 23, 2017, at 10:03 PM, Jaikiran Pai  wrote:

(Re)posting this from the user mailing list to dev mailing list, hoping for 
some inputs from the Kafka dev team:

We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer 
APIs. We have an application where we create Kafka topics dynamically (using 
the AdminUtils Java API) and then start producing/consuming on those topics. 
The issue we frequently run into is this:

1. Application process creates a topic "foo-bar" via AdminUtils.createTopic. 
This is sucessfully completed.
2. Same application process then creates a consumer (using new Java consumer 
API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be 
enrolled in consumer group for this topic because of this (notice the last line 
in the log):

2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): 
foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
group coordinator response ClientResponse(receivedTimeMs=1487667523542, 
disconnected=false, request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
 body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered 
coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking 
previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining 
group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending 
JoinGroup 
({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
 lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,m

Re: Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-02-23 Thread James Cheng

> On Feb 23, 2017, at 10:03 PM, Jaikiran Pai  wrote:
> 
> (Re)posting this from the user mailing list to dev mailing list, hoping for 
> some inputs from the Kafka dev team:
> 
> We are on Kafka 0.10.0.1 (server and client) and use Java consumer/producer 
> APIs. We have an application where we create Kafka topics dynamically (using 
> the AdminUtils Java API) and then start producing/consuming on those topics. 
> The issue we frequently run into is this:
> 
> 1. Application process creates a topic "foo-bar" via AdminUtils.createTopic. 
> This is sucessfully completed.
> 2. Same application process then creates a consumer (using new Java consumer 
> API) on that foo-bar topic as a next step.
> 3. The consumer that gets created in step#2 however, doesn't seem to be 
> enrolled in consumer group for this topic because of this (notice the last 
> line in the log):
> 
> 2017-02-21 00:58:43,359 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
> 2017-02-21 00:58:43,360 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): 
> foo-bar
> 2017-02-21 00:58:43,543 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
> group coordinator response ClientResponse(receivedTimeMs=1487667523542, 
> disconnected=false, request=ClientRequest(expectResponse=true, 
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f,
>  
> request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1},
>  body={group_id=my-app-group}), createdTimeMs=1487667523378, 
> sendTimeMs=1487667523529), 
> responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
> 2017-02-21 00:58:43,543 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered 
> coordinator localhost:9092 (id: 2147483647 rack: null) for group my-app-group.
> 2017-02-21 00:58:43,545 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking 
> previously assigned partitions [] for group my-app-group
> 2017-02-21 00:58:43,545 [Thread-6] INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
> (Re-)joining group my-app-group
> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending 
> JoinGroup 
> ({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
>  lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
> 2017-02-21 00:58:43,548 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.bytes-sent
> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.bytes-received
> 2017-02-21 00:58:43,549 [Thread-6] DEBUG 
> org.apache.kafka.common.metrics.Metrics - Added sensor with name 
> node-2147483647.latency
> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received 
> successful join group response for group my-app-group: 
> {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0
>  lim=59 cap=59]}]}
> 2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing 
> assignment for group my-app-group using strategy range with subscriptions 
> {consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
> *2017-02-21 00:58:43,552 [Thread-6] DEBUG 
> org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
> Skipping assignment for topic foo-bar since no metadata is available*
> 
> 
> 4. A few seconds later, a separate process, produces (via Java producer API) 
> on the foo-bar topic, some messages.
> 5. The consumer created in step#2 (although is waiting for messages) on the 
> foo-bar topic, _doesn't_ consume these messages.
> 6. *5 minutes later* the Kafka server triggers a consumer rebalance which 
> then successfully assigns partition(s) of this foo-bar topic to consumer 
> created in step#2 and the consumer start consuming these messages.
> 
> This 5 minute delay in consuming messages from this dynamically created topic 
> is what we want to avoid. Is there anyway I can deterministically do/force 
> creation of a dynamic topic and be assured that upon completion of that call, 
> I can create a consumer and start consuming of that topic such that it can 
> receive messages as soon as the messages are produced on that topic, without 
> having to wait for a 5 minute delay (or w

[jira] [Created] (KAFKA-4798) Javadocs for 0.10.2 are missing from kafka.apache.org

2017-02-23 Thread James Cheng (JIRA)
James Cheng created KAFKA-4798:
--

 Summary: Javadocs for 0.10.2 are missing from kafka.apache.org
 Key: KAFKA-4798
 URL: https://issues.apache.org/jira/browse/KAFKA-4798
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.10.2.0
Reporter: James Cheng


The javadocs on kafka.apache.org are missing for 0.10.2

The following link yields a 404:
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

If you look in http://kafka.apache.org/0102/, there is no javadoc directory.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Consumption on a explicitly (dynamically) created topic has a 5 minute delay

2017-02-23 Thread Jaikiran Pai
(Re)posting this from the user mailing list to dev mailing list, hoping 
for some inputs from the Kafka dev team:


We are on Kafka 0.10.0.1 (server and client) and use Java 
consumer/producer APIs. We have an application where we create Kafka 
topics dynamically (using the AdminUtils Java API) and then start 
producing/consuming on those topics. The issue we frequently run into is 
this:


1. Application process creates a topic "foo-bar" via 
AdminUtils.createTopic. This is sucessfully completed.
2. Same application process then creates a consumer (using new Java 
consumer API) on that foo-bar topic as a next step.
3. The consumer that gets created in step#2 however, doesn't seem to be 
enrolled in consumer group for this topic because of this (notice the 
last line in the log):


2017-02-21 00:58:43,359 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
2017-02-21 00:58:43,360 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to 
topic(s): foo-bar
2017-02-21 00:58:43,543 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received group coordinator response 
ClientResponse(receivedTimeMs=1487667523542, disconnected=false, 
request=ClientRequest(expectResponse=true, 
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@50aad50f, 
request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, 
body={group_id=my-app-group}), createdTimeMs=1487667523378, 
sendTimeMs=1487667523529), 
responseBody={error_code=0,coordinator={node_id=0,host=localhost,port=9092}})
2017-02-21 00:58:43,543 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for 
group my-app-group.
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Revoking previously assigned partitions [] for group my-app-group
2017-02-21 00:58:43,545 [Thread-6] INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
(Re-)joining group my-app-group
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Sending JoinGroup 
({group_id=my-app-group,session_timeout=3,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}) to coordinator localhost:9092 (id: 2147483647 rack: null)
2017-02-21 00:58:43,548 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-sent
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.bytes-received
2017-02-21 00:58:43,549 [Thread-6] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node-2147483647.latency
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Received successful join group response for group my-app-group: 
{error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,members=[{member_id=consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d,member_metadata=java.nio.HeapByteBuffer[pos=0 
lim=59 cap=59]}]}
2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - 
Performing assignment for group my-app-group using strategy range with 
subscriptions 
{consumer-1-1453e523-402a-43fe-87e8-795ae4c68c5d=Subscription(topics=[foo-bar])}
*2017-02-21 00:58:43,552 [Thread-6] DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor - 
Skipping assignment for topic foo-bar since no metadata is available*



4. A few seconds later, a separate process, produces (via Java producer 
API) on the foo-bar topic, some messages.
5. The consumer created in step#2 (although is waiting for messages) on 
the foo-bar topic, _doesn't_ consume these messages.
6. *5 minutes later* the Kafka server triggers a consumer rebalance 
which then successfully assigns partition(s) of this foo-bar topic to 
consumer created in step#2 and the consumer start consuming these messages.


This 5 minute delay in consuming messages from this dynamically created 
topic is what we want to avoid. Is there anyway I can deterministically 
do/force creation of a dynamic topic and be assured that upon completion 
of that call, I can create a consumer and start consuming of that topic 
such that it can receive messages as soon as the messages are produced 
on that topic, without having to wait for a 5 minute delay (or whatever 
the rebalance configuration is)? In essence, is there a way to ensure 
that the Kafka consumer does get the topic metadata for a topic that was 
created successfully by the same application, immediately?

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Matthias J. Sax
I just read the update KIP once more.

I would suggest to rename --to-duration to --by-duration

Or as a second idea, rename --to-duration to --shift-by-duration and at
the same time rename --shift-offset-by to --shift-by-offset

Not sure what the best option is, but naming would be more consistent IMHO.



-Matthias

On 2/23/17 4:42 PM, Jorge Esteban Quilcate Otoya wrote:
> Hi All,
> 
> If there are no more concerns, I'd like to start vote for this KIP.
> 
> Thanks!
> Jorge.
> 
> El jue., 23 feb. 2017 a las 22:50, Jorge Esteban Quilcate Otoya (<
> quilcate.jo...@gmail.com>) escribió:
> 
>> Oh ok :)
>>
>> So, we can keep `--topic t1:1,2,3`
>>
>> I think with this one we have most of the feedback applied. I will update
>> the KIP with this change.
>>
>> El jue., 23 feb. 2017 a las 22:38, Matthias J. Sax ()
>> escribió:
>>
>> Sounds reasonable.
>>
>> If we have multiple --topic arguments, it does also not matter if we use
>> t1:1,2 or t2=1,2
>>
>> I just suggested '=' because I wanted use ':' to chain multiple topics.
>>
>>
>> -Matthias
>>
>> On 2/23/17 10:49 AM, Jorge Esteban Quilcate Otoya wrote:
>>> Yeap, `--topic t1=1,2`LGTM
>>>
>>> Don't have idea neither about getting rid of repeated --topic, but
>> --group
>>> is also repeated in the case of deletion, so it could be ok to have
>>> repeated --topic arguments.
>>>
>>> El jue., 23 feb. 2017 a las 19:14, Matthias J. Sax (<
>> matth...@confluent.io>)
>>> escribió:
>>>
 So you suggest to merge "scope options" --topics, --topic, and
 --partitions into a single option? Sound good to me.

 I like the compact way to express it, ie, topicname:list-of-partitions
 with "all partitions" if not partitions are specified. It's quite
 intuitive to use.

 Just wondering, if we could get rid of the repeated --topic option; it's
 somewhat verbose. Have no good idea though who to improve it.

 If you concatenate multiple topic, we need one more character that is
 not allowed in topic names to separate the topics:

> invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
 '?', ' ', '\t', '\r', '\n', '='};

 maybe

 --topics t1=1,2,3:t2:t3=3

 use '=' to specify partitions (instead of ':' as you proposed) and ':'
 to separate topics? All other characters seem to be worse to use to me.
 But maybe you have a better idea.



 -Matthias


 On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> @Matthias about the point 9:
>
> What about keeping only the --topic option, and support this format:
>
> `--topic t1:0,1,2 --topic t2 --topic t3:2`
>
> In this case topics t1, t2, and t3 will be selected: topic t1 with
> partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
 with
> only partition 2.
>
> Jorge.
>
> El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> quilcate.jo...@gmail.com>) escribió:
>
>> Thanks for the feedback Matthias.
>>
>> * 1. You're right. I'll reorder the scenarios.
>>
>> * 2. Agree. I'll update the KIP.
>>
>> * 3. I like it, updating to `reset-offsets`
>>
>> * 4. Agree, removing the `reset-` part
>>
>> * 5. Yes, 1.e option without --execute or --export will print out
 current
>> offset, and the new offset, that will be the same. The use-case of
>> this
>> option is to use it in combination with --export mostly and have a
 current
>> 'checkpoint' to reset later. I will add to the KIP how the output
>> should
>> looks like.
>>
>> * 6. Considering 4., I will update it to `--to-offset`
>>
>> * 7. I like the idea to unify these options (plus, minus).
>> `shift-offsets-by` is a good option, but I will like some more
>> feedback
>> here about the name. I will update the KIP in the meantime.
>>
>> * 8. Yes, discussed in 9.
>>
>> * 9. Agree. I'll love some feedback here. `topic` is already used by
>> `delete`, and we can add `--all-topics` to consider all
 topics/partitions
>> assigned to a group. How could we define specific topics/partitions?
>>
>> * 10. Haven't thought about it, but make sense.
>> ,, would be enough.
>>
>> * 11. Agree. Solved with 10.
>>
>> Also, I have a couple of changes to mention:
>>
>> 1. I have add a reference to the branch where I'm working on this KIP.
>>
>> 2. About the period scenario `--to-period`. I will change it to
>> `--to-duration` given that duration (
>> https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
>> follows this format: 'PnDTnHnMnS' and does not consider daylight
>> saving
>> efects.
>>
>>
>>
>> El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
 matth...@confluent.io>)
>> escribió:
>>
>> Hi,
>>
>> thanks for updating the KIP. Couple of follow up comments:
>>

[jira] [Updated] (KAFKA-4797) Add lag in time JMX metrics for fether threads

2017-02-23 Thread Jun Ma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Ma updated KAFKA-4797:
--
Attachment: (was: maxLag.png)

> Add lag in time JMX metrics for fether threads
> --
>
> Key: KAFKA-4797
> URL: https://issues.apache.org/jira/browse/KAFKA-4797
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Jun Ma
> Attachments: maxLag.jpg
>
>
> Since 0.9.0.0, parameter replica.lag.max.messages was removed, replica lag 
> time is now represent the lag between follower and leader replicas. But the 
> metrics kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica 
> is still representing max lag in messages between follower and leader 
> replicas. We should change it to represent lag in time between follower and 
> leader replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4797) Add lag in time JMX metrics for fether threads

2017-02-23 Thread Jun Ma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Ma updated KAFKA-4797:
--
Attachment: maxLag.jpg

> Add lag in time JMX metrics for fether threads
> --
>
> Key: KAFKA-4797
> URL: https://issues.apache.org/jira/browse/KAFKA-4797
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Jun Ma
> Attachments: maxLag.jpg, maxLag.png
>
>
> Since 0.9.0.0, parameter replica.lag.max.messages was removed, replica lag 
> time is now represent the lag between follower and leader replicas. But the 
> metrics kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica 
> is still representing max lag in messages between follower and leader 
> replicas. We should change it to represent lag in time between follower and 
> leader replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4797) Add lag in seconds JMX metrics for fether threads

2017-02-23 Thread Jun Ma (JIRA)
Jun Ma created KAFKA-4797:
-

 Summary: Add lag in seconds JMX metrics for fether threads
 Key: KAFKA-4797
 URL: https://issues.apache.org/jira/browse/KAFKA-4797
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.0, 0.9.0.1, 0.9.0.0
Reporter: Jun Ma
 Attachments: maxLag.png

Since 0.9.0.0, parameter replica.lag.max.messages was removed, replica lag time 
is now represent the lag between follower and leader replicas. But the metrics 
kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica is still 
representing max lag in messages between follower and leader replicas. We 
should change it to represent lag in time between follower and leader replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4797) Add lag in time JMX metrics for fether threads

2017-02-23 Thread Jun Ma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Ma updated KAFKA-4797:
--
Summary: Add lag in time JMX metrics for fether threads  (was: Add lag in 
seconds JMX metrics for fether threads)

> Add lag in time JMX metrics for fether threads
> --
>
> Key: KAFKA-4797
> URL: https://issues.apache.org/jira/browse/KAFKA-4797
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Jun Ma
> Attachments: maxLag.png
>
>
> Since 0.9.0.0, parameter replica.lag.max.messages was removed, replica lag 
> time is now represent the lag between follower and leader replicas. But the 
> metrics kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica 
> is still representing max lag in messages between follower and leader 
> replicas. We should change it to represent lag in time between follower and 
> leader replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4766) Document lz4 and lz4hc in confluence

2017-02-23 Thread Lee Dongjin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-4766:
--

Assignee: Lee Dongjin

> Document lz4 and lz4hc in confluence
> 
>
> Key: KAFKA-4766
> URL: https://issues.apache.org/jira/browse/KAFKA-4766
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Affects Versions: 0.8.2.0
>Reporter: Daniel Pinyol
>Assignee: Lee Dongjin
> Fix For: 0.8.2.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Compression does not 
> mention that lz4 and lz4hc compressions are supported 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4766) Document lz4 and lz4hc in confluence

2017-02-23 Thread Lee Dongjin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881869#comment-15881869
 ] 

Lee Dongjin commented on KAFKA-4766:


[~ijuma] Could you review the wiki update I did for this issue?

> Document lz4 and lz4hc in confluence
> 
>
> Key: KAFKA-4766
> URL: https://issues.apache.org/jira/browse/KAFKA-4766
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Affects Versions: 0.8.2.0
>Reporter: Daniel Pinyol
> Fix For: 0.8.2.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Compression does not 
> mention that lz4 and lz4hc compressions are supported 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-23 Thread Dong Lin
Hey Jun,

Sure, here is my explanation.

Design B would not work if it doesn't store created replicas in the ZK. For
example, say broker B is health when it is shutdown. At this moment no
offline replica is written in ZK for this broker. Suppose log directory is
damaged when broker is offline, then when this broker starts, it won't know
which replicas are in the bad log directory. And it won't be able to
specify those offline replicas in /failed-log-directory either.

Let's say design B stores created replica in ZK. Then the next problem is
that, in the scenario that multiple log directories are damaged while
broker is offline, when broker starts, it won't be able to know the exact
list of offline replicas on each bad log directory. All it knows is the
offline replicas on all those bad log directories. Thus it is impossible
for broker to specify offline replicas per log directory in this scenario.

I agree with your observation that, if admin fixes replaces dir1 with a
good empty disk but leave dir2 untouched, design A won't create replica
whereas design B can create it. But I am not sure that is a problem which
we want to optimize. It seems reasonable for admin to fix both log
directories in practice. If admin fixes only one of the two log
directories, we can say it is a partial fix and Kafka won't re-create any
offline replicas on dir1 and dir2. Similar to extra round of
LeaderAndIsrRequest in case of log failure, I think this is also a pretty
minor issue with design B.

Thanks,
Dong


On Thu, Feb 23, 2017 at 6:46 PM, Jun Rao  wrote:

> Hi, Dong,
>
> My replies are inlined below.
>
> On Thu, Feb 23, 2017 at 4:47 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for you reply! Let me first comment on the things that you listed
> as
> > advantage of B over A.
> >
> > 1) No change in LeaderAndIsrRequest protocol.
> >
> > I agree with this.
> >
> > 2) Step 1. One less round of LeaderAndIsrRequest and no additional ZK
> > writes to record the created flag.
> >
> > I don't think this is true. There will be one round of
> LeaderAndIsrRequest
> > in both A and B. In the design A controller needs to write to ZK once to
> > record this replica as created. The design B the broker needs to write
> > zookeeper once to record this replica as created. So there is same number
> > of LeaderAndIsrRequest and ZK writes.
> >
> > Broker needs to record created replica in design B so that when it
> > bootstraps with failed log directory, the broker can derive the offline
> > replicas as the difference between created replicas and replicas found on
> > good log directories.
> >
> >
> Design B actually doesn't write created replicas in ZK. When a broker
> starts up, all offline replicas are stored in the /failed-log-directory
> path in ZK. So if a replica is not there and is not in the live log
> directories either, it's never created. Does this work?
>
>
>
> > 3) Step 2. One less round of LeaderAndIsrRequest and no additional logic
> to
> > handle LeaderAndIsrResponse.
> >
> > While I agree there is one less round of LeaderAndIsrRequest in design
> B, I
> > don't think one additional LeaderAndIsrRequest to handle log directory
> > failure is a big deal given that it doesn't happen frequently.
> >
> > Also, while there is no additional logic to handle LeaderAndIsrResponse
> in
> > design B, I actually think this is something that controller should do
> > anyway. Say the broker stops responding to any requests without removing
> > itself from zookeeper, the only way for controller to realize this and
> > re-elect leader is to send request to this broker and handle response.
> The
> > is a problem that we don't do it as of now.
> >
> > 4) Step 6. Additional ZK reads proportional to # of failed log
> directories,
> > instead of # of partitions.
> >
> > If one znode is able to describe all topic partitions in a log directory,
> > then the existing znode /brokers/topics/[topic] should be able to
> describe
> > created replicas in addition to the assigned replicas for every partition
> > of the topic. In this case, design A requires no additional ZK reads
> > whereas design B ZK reads proportional to # of failed log directories.
> >
> > 5) Step 3. In design A, if a broker is restarted and the failed log
> > directory is unreadable, the broker doesn't know which replicas are on
> the
> > failed log directory. So, when the broker receives the LeadAndIsrRequest
> > with created = false, it's bit hard for the broker to decide whether it
> > should create the missing replica on other log directories. This is
> easier
> > in design B since the list of failed replicas are persisted in ZK.
> >
> > I don't understand why it is hard for broker to make decision in design
> A.
> > With design A, if a broker is started with a failed log directory and it
> > receives LeaderAndIsrRequest with created=false for a replica that can
> not
> > be found on any good log directory, broker will not create this replica.
> Is
> > there any drawback with this

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-23 Thread Jun Rao
Hi, Dong,

My replies are inlined below.

On Thu, Feb 23, 2017 at 4:47 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for you reply! Let me first comment on the things that you listed as
> advantage of B over A.
>
> 1) No change in LeaderAndIsrRequest protocol.
>
> I agree with this.
>
> 2) Step 1. One less round of LeaderAndIsrRequest and no additional ZK
> writes to record the created flag.
>
> I don't think this is true. There will be one round of LeaderAndIsrRequest
> in both A and B. In the design A controller needs to write to ZK once to
> record this replica as created. The design B the broker needs to write
> zookeeper once to record this replica as created. So there is same number
> of LeaderAndIsrRequest and ZK writes.
>
> Broker needs to record created replica in design B so that when it
> bootstraps with failed log directory, the broker can derive the offline
> replicas as the difference between created replicas and replicas found on
> good log directories.
>
>
Design B actually doesn't write created replicas in ZK. When a broker
starts up, all offline replicas are stored in the /failed-log-directory
path in ZK. So if a replica is not there and is not in the live log
directories either, it's never created. Does this work?



> 3) Step 2. One less round of LeaderAndIsrRequest and no additional logic to
> handle LeaderAndIsrResponse.
>
> While I agree there is one less round of LeaderAndIsrRequest in design B, I
> don't think one additional LeaderAndIsrRequest to handle log directory
> failure is a big deal given that it doesn't happen frequently.
>
> Also, while there is no additional logic to handle LeaderAndIsrResponse in
> design B, I actually think this is something that controller should do
> anyway. Say the broker stops responding to any requests without removing
> itself from zookeeper, the only way for controller to realize this and
> re-elect leader is to send request to this broker and handle response. The
> is a problem that we don't do it as of now.
>
> 4) Step 6. Additional ZK reads proportional to # of failed log directories,
> instead of # of partitions.
>
> If one znode is able to describe all topic partitions in a log directory,
> then the existing znode /brokers/topics/[topic] should be able to describe
> created replicas in addition to the assigned replicas for every partition
> of the topic. In this case, design A requires no additional ZK reads
> whereas design B ZK reads proportional to # of failed log directories.
>
> 5) Step 3. In design A, if a broker is restarted and the failed log
> directory is unreadable, the broker doesn't know which replicas are on the
> failed log directory. So, when the broker receives the LeadAndIsrRequest
> with created = false, it's bit hard for the broker to decide whether it
> should create the missing replica on other log directories. This is easier
> in design B since the list of failed replicas are persisted in ZK.
>
> I don't understand why it is hard for broker to make decision in design A.
> With design A, if a broker is started with a failed log directory and it
> receives LeaderAndIsrRequest with created=false for a replica that can not
> be found on any good log directory, broker will not create this replica. Is
> there any drawback with this approach?
>
>
>
I was thinking about this case. Suppose two log directories dir1 and dir2
fail. The admin replaces dir1 with an empty new disk. The broker is
restarted with dir1 alive, and dir2 still failing. Now, when receiving a
LeaderAndIsrRequest including a replica that was previously in dir1, the
broker won't be able to create that replica when it could.



> Here is my summary of pros and cons of design B as compared to design A.
>
> pros:
>
> 1) No change to LeaderAndIsrRequest.
> 2) One less round of LeaderAndIsrRequest in case of log directory failure.
>
> cons:
>
> 1) This is impossible for broker to figure out the log directory of offline
> replicas for failed-log-directory/[directory] if multiple log directories
> are unreadable when broker starts.
>
>
Hmm, I am not sure that I get this point. If multiple log directories fail,
design B stores each directory under /failed-log-directory, right?

Thanks,

Jun



> 2) The znode size limit of failed-log-directory/[directory] essentially
> limits the number of topic partitions that can exist on a log directory. It
> becomes more of a problem when a broker is configured to use multiple log
> directories each of which is a RAID-10 of large capacity. While this may
> not be a problem in practice with additional requirement (e.g. don't use
> more than one log directory if using RAID-10), ideally we want to avoid
> such limit.
>
> 3) Extra ZK read of failed-log-directory/[directory] when broker starts
>
>
> My main concern with the design B is the use of znode
> /brokers/ids/[brokerId]/failed-log-directory/[directory]. I don't really
> think other pros/cons of design B matter to us. Does my summary make sense?
>
> Thanks,
> Dong
>
>
> On Thu, Fe

[jira] [Commented] (KAFKA-4796) Fix some findbugs warnings in Kafka Java client

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881667#comment-15881667
 ] 

ASF GitHub Bot commented on KAFKA-4796:
---

GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2593

KAFKA-4796: Fix some findbugs warnings in Kafka Java client



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4796

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2593


commit 250473dd3405085fa62b6ede444ca1682fb56bf3
Author: Colin P. Mccabe 
Date:   2017-02-24T00:55:04Z

KAFKA-4796: Fix some findbugs warnings in Kafka Java client




> Fix some findbugs warnings in Kafka Java client
> ---
>
> Key: KAFKA-4796
> URL: https://issues.apache.org/jira/browse/KAFKA-4796
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Fix some findbugs warnings in Kafka Java client



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2593: KAFKA-4796: Fix some findbugs warnings in Kafka Ja...

2017-02-23 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/2593

KAFKA-4796: Fix some findbugs warnings in Kafka Java client



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-4796

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2593


commit 250473dd3405085fa62b6ede444ca1682fb56bf3
Author: Colin P. Mccabe 
Date:   2017-02-24T00:55:04Z

KAFKA-4796: Fix some findbugs warnings in Kafka Java client




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4796) Fix some findbugs warnings in Kafka Java client

2017-02-23 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-4796:
--

 Summary: Fix some findbugs warnings in Kafka Java client
 Key: KAFKA-4796
 URL: https://issues.apache.org/jira/browse/KAFKA-4796
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Fix some findbugs warnings in Kafka Java client



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-23 Thread Dong Lin
Hey Jun,

Thanks for you reply! Let me first comment on the things that you listed as
advantage of B over A.

1) No change in LeaderAndIsrRequest protocol.

I agree with this.

2) Step 1. One less round of LeaderAndIsrRequest and no additional ZK
writes to record the created flag.

I don't think this is true. There will be one round of LeaderAndIsrRequest
in both A and B. In the design A controller needs to write to ZK once to
record this replica as created. The design B the broker needs to write
zookeeper once to record this replica as created. So there is same number
of LeaderAndIsrRequest and ZK writes.

Broker needs to record created replica in design B so that when it
bootstraps with failed log directory, the broker can derive the offline
replicas as the difference between created replicas and replicas found on
good log directories.

3) Step 2. One less round of LeaderAndIsrRequest and no additional logic to
handle LeaderAndIsrResponse.

While I agree there is one less round of LeaderAndIsrRequest in design B, I
don't think one additional LeaderAndIsrRequest to handle log directory
failure is a big deal given that it doesn't happen frequently.

Also, while there is no additional logic to handle LeaderAndIsrResponse in
design B, I actually think this is something that controller should do
anyway. Say the broker stops responding to any requests without removing
itself from zookeeper, the only way for controller to realize this and
re-elect leader is to send request to this broker and handle response. The
is a problem that we don't do it as of now.

4) Step 6. Additional ZK reads proportional to # of failed log directories,
instead of # of partitions.

If one znode is able to describe all topic partitions in a log directory,
then the existing znode /brokers/topics/[topic] should be able to describe
created replicas in addition to the assigned replicas for every partition
of the topic. In this case, design A requires no additional ZK reads
whereas design B ZK reads proportional to # of failed log directories.

5) Step 3. In design A, if a broker is restarted and the failed log
directory is unreadable, the broker doesn't know which replicas are on the
failed log directory. So, when the broker receives the LeadAndIsrRequest
with created = false, it's bit hard for the broker to decide whether it
should create the missing replica on other log directories. This is easier
in design B since the list of failed replicas are persisted in ZK.

I don't understand why it is hard for broker to make decision in design A.
With design A, if a broker is started with a failed log directory and it
receives LeaderAndIsrRequest with created=false for a replica that can not
be found on any good log directory, broker will not create this replica. Is
there any drawback with this approach?


Here is my summary of pros and cons of design B as compared to design A.

pros:

1) No change to LeaderAndIsrRequest.
2) One less round of LeaderAndIsrRequest in case of log directory failure.

cons:

1) This is impossible for broker to figure out the log directory of offline
replicas for failed-log-directory/[directory] if multiple log directories
are unreadable when broker starts.

2) The znode size limit of failed-log-directory/[directory] essentially
limits the number of topic partitions that can exist on a log directory. It
becomes more of a problem when a broker is configured to use multiple log
directories each of which is a RAID-10 of large capacity. While this may
not be a problem in practice with additional requirement (e.g. don't use
more than one log directory if using RAID-10), ideally we want to avoid
such limit.

3) Extra ZK read of failed-log-directory/[directory] when broker starts


My main concern with the design B is the use of znode
/brokers/ids/[brokerId]/failed-log-directory/[directory]. I don't really
think other pros/cons of design B matter to us. Does my summary make sense?

Thanks,
Dong


On Thu, Feb 23, 2017 at 2:20 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Just so that we are on the same page. Let me spec out the alternative
> design a bit more and then compare. Let's call the current design A and the
> alternative design B.
>
> Design B:
>
> New ZK path
> failed log directory path (persistent): This is created by a broker when a
> log directory fails and is potentially removed when the broker is
> restarted.
> /brokers/ids/[brokerId]/failed-log-directory/directory1 => { json of the
> replicas in the log directory }.
>
> *1. Topic gets created*
> - Works the same as before.
>
> *2. A log directory stops working on a broker during runtime*
>
> - The controller watches the path /failed-log-directory for the new znode.
>
> - The broker detects an offline log directory during runtime and marks
> affected replicas as offline in memory.
>
> - The broker writes the failed directory and all replicas in the failed
> directory under /failed-log-directory/directory1.
>
> - The controller reads /failed-log-directory/directory1 and 

[VOTE] KIP-122: Add Reset Consumer Group Offsets tooling

2017-02-23 Thread Jorge Esteban Quilcate Otoya
Hi All,

It seems that there is no further concern with the KIP-122.
At this point we would like to start the voting process.

The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling


Thanks!

Jorge.


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Jorge Esteban Quilcate Otoya
Hi All,

If there are no more concerns, I'd like to start vote for this KIP.

Thanks!
Jorge.

El jue., 23 feb. 2017 a las 22:50, Jorge Esteban Quilcate Otoya (<
quilcate.jo...@gmail.com>) escribió:

> Oh ok :)
>
> So, we can keep `--topic t1:1,2,3`
>
> I think with this one we have most of the feedback applied. I will update
> the KIP with this change.
>
> El jue., 23 feb. 2017 a las 22:38, Matthias J. Sax ()
> escribió:
>
> Sounds reasonable.
>
> If we have multiple --topic arguments, it does also not matter if we use
> t1:1,2 or t2=1,2
>
> I just suggested '=' because I wanted use ':' to chain multiple topics.
>
>
> -Matthias
>
> On 2/23/17 10:49 AM, Jorge Esteban Quilcate Otoya wrote:
> > Yeap, `--topic t1=1,2`LGTM
> >
> > Don't have idea neither about getting rid of repeated --topic, but
> --group
> > is also repeated in the case of deletion, so it could be ok to have
> > repeated --topic arguments.
> >
> > El jue., 23 feb. 2017 a las 19:14, Matthias J. Sax (<
> matth...@confluent.io>)
> > escribió:
> >
> >> So you suggest to merge "scope options" --topics, --topic, and
> >> --partitions into a single option? Sound good to me.
> >>
> >> I like the compact way to express it, ie, topicname:list-of-partitions
> >> with "all partitions" if not partitions are specified. It's quite
> >> intuitive to use.
> >>
> >> Just wondering, if we could get rid of the repeated --topic option; it's
> >> somewhat verbose. Have no good idea though who to improve it.
> >>
> >> If you concatenate multiple topic, we need one more character that is
> >> not allowed in topic names to separate the topics:
> >>
> >>> invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
> >> '?', ' ', '\t', '\r', '\n', '='};
> >>
> >> maybe
> >>
> >> --topics t1=1,2,3:t2:t3=3
> >>
> >> use '=' to specify partitions (instead of ':' as you proposed) and ':'
> >> to separate topics? All other characters seem to be worse to use to me.
> >> But maybe you have a better idea.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> >>> @Matthias about the point 9:
> >>>
> >>> What about keeping only the --topic option, and support this format:
> >>>
> >>> `--topic t1:0,1,2 --topic t2 --topic t3:2`
> >>>
> >>> In this case topics t1, t2, and t3 will be selected: topic t1 with
> >>> partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
> >> with
> >>> only partition 2.
> >>>
> >>> Jorge.
> >>>
> >>> El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> >>> quilcate.jo...@gmail.com>) escribió:
> >>>
>  Thanks for the feedback Matthias.
> 
>  * 1. You're right. I'll reorder the scenarios.
> 
>  * 2. Agree. I'll update the KIP.
> 
>  * 3. I like it, updating to `reset-offsets`
> 
>  * 4. Agree, removing the `reset-` part
> 
>  * 5. Yes, 1.e option without --execute or --export will print out
> >> current
>  offset, and the new offset, that will be the same. The use-case of
> this
>  option is to use it in combination with --export mostly and have a
> >> current
>  'checkpoint' to reset later. I will add to the KIP how the output
> should
>  looks like.
> 
>  * 6. Considering 4., I will update it to `--to-offset`
> 
>  * 7. I like the idea to unify these options (plus, minus).
>  `shift-offsets-by` is a good option, but I will like some more
> feedback
>  here about the name. I will update the KIP in the meantime.
> 
>  * 8. Yes, discussed in 9.
> 
>  * 9. Agree. I'll love some feedback here. `topic` is already used by
>  `delete`, and we can add `--all-topics` to consider all
> >> topics/partitions
>  assigned to a group. How could we define specific topics/partitions?
> 
>  * 10. Haven't thought about it, but make sense.
>  ,, would be enough.
> 
>  * 11. Agree. Solved with 10.
> 
>  Also, I have a couple of changes to mention:
> 
>  1. I have add a reference to the branch where I'm working on this KIP.
> 
>  2. About the period scenario `--to-period`. I will change it to
>  `--to-duration` given that duration (
>  https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
>  follows this format: 'PnDTnHnMnS' and does not consider daylight
> saving
>  efects.
> 
> 
> 
>  El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
> >> matth...@confluent.io>)
>  escribió:
> 
>  Hi,
> 
>  thanks for updating the KIP. Couple of follow up comments:
> 
>  * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
>  time" option -- IMHO it belongs to "reset by position"?
> 
> 
>  * Nit: Description of "Reset to Earliest"
> 
> > using Kafka Consumer's `auto.offset.reset` to `earliest`
> 
>  I think this is strictly speaking not correct (as auto.offset.reset
> only
>  triggered if no valid offset is found, but this tool e

Build failed in Jenkins: kafka-trunk-jdk8 #1296

2017-02-23 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Reduce stream thread metrics overhead

--
[...truncated 1.42 MB...]

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testRestoreWithDefaultSerdes PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testRestore 
STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > testRestore 
PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRange STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRange PASSED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRangeWithDefaultSerdes STARTED

org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreTest > 
testPutGetRangeWithDefaultSerdes PASSED

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode STARTED

org.apache.kafka.streams.KeyValueTest > shouldHaveSaneEqualsAndHashCode PASSED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig STARTED

org.apache.kafka.streams.KafkaStreamsTest > testIllegalMetricsConfig PASSED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > shouldNotGetAllTasksWhenNotRunning 
PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
testInitializesAndDestroysMetricsReporters STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
testInitializesAndDestroysMetricsReporters PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testLegalMetricsConfig STARTED

org.apache.kafka.streams.KafkaStreamsTest > testLegalMetricsConfig PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldReturnFalseOnCloseWhenThreadsHaventTerminated STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldReturnFalseOnCloseWhenThreadsHaventTerminated PASSED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning STARTED

org.apache.kafka.streams.KafkaStreamsTest > 
shouldNotGetAllTasksWithStoreWhenNotRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartOnceClosed PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCleanup PASSED

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetrics STARTED

org.apache.kafka.streams.KafkaStreamsTest > testNumberDefaultMetrics PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCloseIsIdempotent PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning 
STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotCleanupWhileRunning PASSED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice STARTED

org.apache.kafka.streams.KafkaStreamsTest > testCannotStartTwice PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableLeftJoin PASSED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin STARTED

org.apache.kafka.streams.integration.GlobalKTableIntegrationTest > 
shouldKStreamGlobalKTableJoin PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingPattern PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowExceptionOverlappingTopic PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldThrowStreamsExceptionNoResetSpecified PASSED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecified STARTED

org.apache.kafka.streams.integration.KStreamsFineGrainedAutoResetIntegrationTest
 > shouldOnlyReadRecordsWhereEarliestSpecified PASSED

org.apache.

[jira] [Created] (KAFKA-4795) Confusion around topic deletion

2017-02-23 Thread Vahid Hashemian (JIRA)
Vahid Hashemian created KAFKA-4795:
--

 Summary: Confusion around topic deletion
 Key: KAFKA-4795
 URL: https://issues.apache.org/jira/browse/KAFKA-4795
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0
Reporter: Vahid Hashemian
Assignee: Vahid Hashemian


The topic deletion works like in 0.10.2.0:
# {{bin/zookeeper-server-start.sh config/zookeeper.properties}}
# {{bin/kafka-server-start.sh config/server.properties}} (uses default 
{{server.properties}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
--replication-factor 1 --partitions 1}} (creates the topic {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns {{test}})

Previously, the last command above returned {{test - marked for deletion}}, 
which matched the output statement of the {{--delete}} topic command.

Continuing with the above scenario,
# stop the broker
# add the broker config {{delete.topic.enable=true}} in the config file
# {{bin/kafka-server-start.sh config/server.properties}} (this does not remove 
the topic {{test}}, as if the topic was never marked for deletion).
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test}} 
(reports {{Topic test is marked for deletion. Note: This will have no impact if 
delete.topic.enable is not set to true.}})
# {{bin/kafka-topics.sh --zookeeper localhost:2181 --list}} (returns no topics).

It seems that the "marked for deletion" state for topics no longer exists.

I opened this JIRA so I can get a confirmation on the expected topic deletion 
behavior, because in any case, I think the user experience could be improved 
(either there is a bug in the code, or the command's output statement is 
misleading).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Rajini Sivaram
Thank you all for the feedback.

Ismael #1. It makes sense not to throttle inter-broker requests like
LeaderAndIsr etc. The simplest way to ensure that clients cannot use these
requests to bypass quotas for DoS attacks is to ensure that ACLs prevent
clients from using these requests and unauthorized requests are included
towards quotas.

Ismael #2, Jay #1 : I was thinking that these quotas can return a separate
throttle time, and all utilization based quotas could use the same field
(we won't add another one for network thread utilization for instance). But
perhaps it makes sense to keep byte rate quotas separate in produce/fetch
responses to provide separate metrics? Agree with Ismael that the name of
the existing field should be changed if we have two. Happy to switch to a
single combined throttle time if that is sufficient.

Ismael #4, #5, #6: Will update KIP. Will use dot separated name for new
property. Replication quotas use dot separated, so it will be consistent
with all properties except byte rate quotas.

Radai: #1 Request processing time rather than request rate were chosen
because the time per request can vary significantly between requests as
mentioned in the discussion and KIP.
#2 Two separate quotas for heartbeats/regular requests feel like more
configuration and more metrics. Since most users would set quotas higher
than the expected usage and quotas are more of a safety net, a single quota
should work in most cases.
 #3 The number of requests in purgatory is limited by the number of active
connections since only one request per connection will be throttled at a
time.
#4 As with byte rate quotas, to use the full allocated quotas,
clients/users would need to use partitions that are distributed across the
cluster. The alternative of using cluster-wide quotas instead of per-broker
quotas would be far too complex to implement.

Dong : We currently have two ClientQuotaManagers for quota types Fetch and
Produce. A new one will be added for IOThread, which manages quotas for I/O
thread utilization. This will not update the Fetch or Produce queue-size,
but will have a separate metric for the queue-size.  I wasn't planning to
add any additional metrics apart from the equivalent ones for existing
quotas as part of this KIP. Ratio of byte-rate to I/O thread utilization
could be slightly misleading since it depends on the sequence of requests.
But we can look into more metrics after the KIP is implemented if required.

I think we need to limit the maximum delay since all requests are
throttled. If a client has a quota of 0.001 units and a single request used
50ms, we don't want to delay all requests from the client by 50 seconds,
throwing the client out of all its consumer groups. The issue is only if a
user is allocated a quota that is insufficient to process one large
request. The expectation is that the units allocated per user will be much
higher than the time taken to process one request and the limit should
seldom be applied. Agree this needs proper documentation.

Regards,

Rajini


On Thu, Feb 23, 2017 at 8:04 PM, radai  wrote:

> @jun: i wasnt concerned about tying up a request processing thread, but
> IIUC the code does still read the entire request out, which might add-up to
> a non-negligible amount of memory.
>
> On Thu, Feb 23, 2017 at 11:55 AM, Dong Lin  wrote:
>
> > Hey Rajini,
> >
> > The current KIP says that the maximum delay will be reduced to window
> size
> > if it is larger than the window size. I have a concern with this:
> >
> > 1) This essentially means that the user is allowed to exceed their quota
> > over a long period of time. Can you provide an upper bound on this
> > deviation?
> >
> > 2) What is the motivation for cap the maximum delay by the window size? I
> > am wondering if there is better alternative to address the problem.
> >
> > 3) It means that the existing metric-related config will have a more
> > directly impact on the mechanism of this io-thread-unit-based quota. The
> > may be an important change depending on the answer to 1) above. We
> probably
> > need to document this more explicitly.
> >
> > Dong
> >
> >
> > On Thu, Feb 23, 2017 at 10:56 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Yeah you are right. I thought it wasn't because at LinkedIn it will be
> > too
> > > much pressure on inGraph to expose those per-clientId metrics so we
> ended
> > > up printing them periodically to local log. Never mind if it is not a
> > > general problem.
> > >
> > > Hey Rajini,
> > >
> > > - I agree with Jay that we probably don't want to add a new field for
> > > every quota ProduceResponse or FetchResponse. Is there any use-case for
> > > having separate throttle-time fields for byte-rate-quota and
> > > io-thread-unit-quota? You probably need to document this as interface
> > > change if you plan to add new field in any request.
> > >
> > > - I don't think IOThread belongs to quotaType. The existing quota types
> > > (i.e. Produce/Fetch/LeaderReplication/Follo

[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881456#comment-15881456
 ] 

Matthias J. Sax commented on KAFKA-4791:


Ack. My bad. I thought [~clouTrix] calls 
{{connectStateStoreNameToSourceTopics}}, but he doesn't... Calling 
{{addStateStore}} is of course fine. [~bbejeck] feel free to start working on 
this. :)

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-23 Thread Jun Rao
Hi, Dong,

Just so that we are on the same page. Let me spec out the alternative
design a bit more and then compare. Let's call the current design A and the
alternative design B.

Design B:

New ZK path
failed log directory path (persistent): This is created by a broker when a
log directory fails and is potentially removed when the broker is
restarted.
/brokers/ids/[brokerId]/failed-log-directory/directory1 => { json of the
replicas in the log directory }.

*1. Topic gets created*
- Works the same as before.

*2. A log directory stops working on a broker during runtime*

- The controller watches the path /failed-log-directory for the new znode.

- The broker detects an offline log directory during runtime and marks
affected replicas as offline in memory.

- The broker writes the failed directory and all replicas in the failed
directory under /failed-log-directory/directory1.

- The controller reads /failed-log-directory/directory1 and stores in
memory a list of failed replicas due to disk failures.

- The controller moves those replicas due to disk failure to offline state
and triggers the state change in replica state machine.


*3. Broker is restarted*

- The broker reads /brokers/ids/[brokerId]/failed-log-directory, if any.

- For each failed log directory it reads from ZK, if the log directory
exists in log.dirs and is accessible now, or if the log directory no longer
exists in log.dirs, remove that log directory from failed-log-directory.
Otherwise, the broker loads replicas in the failed log directory in memory
as offline.

- The controller handles the failed log directory change event, if needed
(same as #2).

- The controller handles the broker registration event.


*6. Controller failover*
- Controller reads all child paths under /failed-log-directory to rebuild
the list of failed replicas due to disk failures. Those replicas will be
transitioned to the offline state during controller initialization.

Comparing this with design A, I think the following are the things that
design B simplifies.
* No change in LeaderAndIsrRequest protocol.
* Step 1. One less round of LeaderAndIsrRequest and no additional ZK writes
to record the created flag.
* Step 2. One less round of LeaderAndIsrRequest and no additional logic to
handle LeaderAndIsrResponse.
* Step 6. Additional ZK reads proportional to # of failed log directories,
instead of # of partitions.
* Step 3. In design A, if a broker is restarted and the failed log
directory is unreadable, the broker doesn't know which replicas are on the
failed log directory. So, when the broker receives the LeadAndIsrRequest
with created = false, it's bit hard for the broker to decide whether it
should create the missing replica on other log directories. This is easier
in design B since the list of failed replicas are persisted in ZK.

Now, for some of the other things that you mentioned.

* What happens if a log directory is renamed?
I think this can be handled in the same way as non-existing log directory
during broker restart.

* What happens if replicas are moved manually across disks?
Good point. Well, if all log directories are available, the failed log
directory path will be cleared. In the rarer case that a log directory is
still offline and one of the replicas registered in the failed log
directory shows up in another available log directory, I am not quite sure.
Perhaps the simplest approach is to just error out and let the admin fix
things manually?

Thanks,

Jun



On Wed, Feb 22, 2017 at 3:39 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for the explanation. I have some questions about 21 but that is
> less important than 20. 20 would require considerable change to the KIP and
> probably requires weeks to discuss again. Thus I would like to be very sure
> that we agree on the problems with the current design as you mentioned and
> there is no foreseeable problem with the alternate design.
>
> Please see below I detail response. To summarize my points, I couldn't
> figure out any non-trival drawback of the current design as compared to the
> alternative design; and I couldn't figure out a good way to store offline
> replicas in the alternative design. Can you see if these points make sense?
> Thanks in advance for your time!!
>
>
> 1) The alternative design requires slightly more dependency on ZK. While
> both solutions store created replicas in the ZK, the alternative design
> would also store offline replicas in ZK but the current design doesn't.
> Thus
>
> 2) I am not sure that we should store offline replicas in znode
> /brokers/ids/[brokerId]/failed-log-directory/[directory]. We probably
> don't
> want to expose log directory path in zookeeper based on the concept that we
> should only store logical information (e.g. topic, brokerId) in zookeeper's
> path name. More specifically, we probably don't want to rename path in
> zookeeper simply because user renamed a log director. And we probably don't
> want to read/write these znode just because user man

[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-02-23 Thread Florian Hussonnois (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Hussonnois updated KAFKA-4794:
--
Affects Version/s: 0.10.2.0

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-02-23 Thread Florian Hussonnois (JIRA)
Florian Hussonnois created KAFKA-4794:
-

 Summary: Add access to OffsetStorageReader from SourceConnector
 Key: KAFKA-4794
 URL: https://issues.apache.org/jira/browse/KAFKA-4794
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Florian Hussonnois
Priority: Minor


Currently the offsets storage is only accessible from SourceTask to able to 
initialize properly tasks after a restart, a crash or a reconfiguration request.

To implement more complex connectors that need to track the progression of each 
task it would helpful to have access to an OffsetStorageReader instance from 
the SourceConnector.

In that way, we could have a background thread that could request a tasks 
reconfiguration based on source offsets.

This improvement proposal comes from a customer project that needs to 
periodically scan directories on a shared storage for detecting and for 
streaming new files into Kafka.

The connector implementation is pretty straightforward.

The connector uses a background thread to periodically scan directories. When 
new inputs files are detected a tasks reconfiguration is requested. Then the 
connector assigns a file subset to each task. 

Each task stores sources offsets for the last sent record. The source offsets 
data are:
 - the size of file
 - the bytes offset
 - the bytes size 

Tasks become idle when the assigned files are completed (in : 
recordBytesOffsets + recordBytesSize = fileBytesSize).

Then, the connector should be able to track offsets for each assigned file. 
When all tasks has finished the connector can stop them or assigned new files 
by requesting tasks reconfiguration. 

Moreover, another advantage of monitoring source offsets from the connector is 
detect slow or failed tasks and if necessary to be able to restart all tasks.

If you think this improvement is OK, I can work a pull request.

Thanks,



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Jorge Esteban Quilcate Otoya
Oh ok :)

So, we can keep `--topic t1:1,2,3`

I think with this one we have most of the feedback applied. I will update
the KIP with this change.

El jue., 23 feb. 2017 a las 22:38, Matthias J. Sax ()
escribió:

> Sounds reasonable.
>
> If we have multiple --topic arguments, it does also not matter if we use
> t1:1,2 or t2=1,2
>
> I just suggested '=' because I wanted use ':' to chain multiple topics.
>
>
> -Matthias
>
> On 2/23/17 10:49 AM, Jorge Esteban Quilcate Otoya wrote:
> > Yeap, `--topic t1=1,2`LGTM
> >
> > Don't have idea neither about getting rid of repeated --topic, but
> --group
> > is also repeated in the case of deletion, so it could be ok to have
> > repeated --topic arguments.
> >
> > El jue., 23 feb. 2017 a las 19:14, Matthias J. Sax (<
> matth...@confluent.io>)
> > escribió:
> >
> >> So you suggest to merge "scope options" --topics, --topic, and
> >> --partitions into a single option? Sound good to me.
> >>
> >> I like the compact way to express it, ie, topicname:list-of-partitions
> >> with "all partitions" if not partitions are specified. It's quite
> >> intuitive to use.
> >>
> >> Just wondering, if we could get rid of the repeated --topic option; it's
> >> somewhat verbose. Have no good idea though who to improve it.
> >>
> >> If you concatenate multiple topic, we need one more character that is
> >> not allowed in topic names to separate the topics:
> >>
> >>> invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
> >> '?', ' ', '\t', '\r', '\n', '='};
> >>
> >> maybe
> >>
> >> --topics t1=1,2,3:t2:t3=3
> >>
> >> use '=' to specify partitions (instead of ':' as you proposed) and ':'
> >> to separate topics? All other characters seem to be worse to use to me.
> >> But maybe you have a better idea.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> >>> @Matthias about the point 9:
> >>>
> >>> What about keeping only the --topic option, and support this format:
> >>>
> >>> `--topic t1:0,1,2 --topic t2 --topic t3:2`
> >>>
> >>> In this case topics t1, t2, and t3 will be selected: topic t1 with
> >>> partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
> >> with
> >>> only partition 2.
> >>>
> >>> Jorge.
> >>>
> >>> El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> >>> quilcate.jo...@gmail.com>) escribió:
> >>>
>  Thanks for the feedback Matthias.
> 
>  * 1. You're right. I'll reorder the scenarios.
> 
>  * 2. Agree. I'll update the KIP.
> 
>  * 3. I like it, updating to `reset-offsets`
> 
>  * 4. Agree, removing the `reset-` part
> 
>  * 5. Yes, 1.e option without --execute or --export will print out
> >> current
>  offset, and the new offset, that will be the same. The use-case of
> this
>  option is to use it in combination with --export mostly and have a
> >> current
>  'checkpoint' to reset later. I will add to the KIP how the output
> should
>  looks like.
> 
>  * 6. Considering 4., I will update it to `--to-offset`
> 
>  * 7. I like the idea to unify these options (plus, minus).
>  `shift-offsets-by` is a good option, but I will like some more
> feedback
>  here about the name. I will update the KIP in the meantime.
> 
>  * 8. Yes, discussed in 9.
> 
>  * 9. Agree. I'll love some feedback here. `topic` is already used by
>  `delete`, and we can add `--all-topics` to consider all
> >> topics/partitions
>  assigned to a group. How could we define specific topics/partitions?
> 
>  * 10. Haven't thought about it, but make sense.
>  ,, would be enough.
> 
>  * 11. Agree. Solved with 10.
> 
>  Also, I have a couple of changes to mention:
> 
>  1. I have add a reference to the branch where I'm working on this KIP.
> 
>  2. About the period scenario `--to-period`. I will change it to
>  `--to-duration` given that duration (
>  https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
>  follows this format: 'PnDTnHnMnS' and does not consider daylight
> saving
>  efects.
> 
> 
> 
>  El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
> >> matth...@confluent.io>)
>  escribió:
> 
>  Hi,
> 
>  thanks for updating the KIP. Couple of follow up comments:
> 
>  * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
>  time" option -- IMHO it belongs to "reset by position"?
> 
> 
>  * Nit: Description of "Reset to Earliest"
> 
> > using Kafka Consumer's `auto.offset.reset` to `earliest`
> 
>  I think this is strictly speaking not correct (as auto.offset.reset
> only
>  triggered if no valid offset is found, but this tool explicitly
> modified
>  committed offset), and should be phrased as
> 
> > using Kafka Consumer's #seekToBeginning()
> 
>  -> similar issue for description of "Reset to Latest"
> 
> 
> >>

[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881339#comment-15881339
 ] 

Matthias J. Sax commented on KAFKA-4601:


[~mihbor] Thanks for your comment. I guess it's related, but IMHO should be an 
own JIRA. Would you mind creating one? I understand your need and had a similar 
though like this once too. But it's a "dangerous" feature, too. But we discuss 
in the new JIRA about pros/cons and if we should allow this or not. Thanks.

> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable counts = source
> .groupByKey()
> .count("Counts");
> KStream sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two repartition topics, one for the aggregate and one for the 
> join, which not only introduce unnecessary overheads but also mess up the 
> processing ordering (users are expecting each record to go through 
> aggregation first then the join operator). And in order to get the following 
> simpler topology users today need to add a {{through}} operator after {{map}} 
> manually to enforce repartitioning.
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-SINK-02]
>   KSTREAM-SINK-02:
>   topic:  topic 2
> ProcessorTopology:
>   KSTREAM-SOURCE-03:
>   topics: [topic 2]
>   children:   
> [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05]
>   KSTREAM-AGGREGATE-04:
>   states: [Counts]
>   KSTREAM-LEFTJOIN-05:
>   states: [Counts]
> {code} 
> This kind of optimization should be automatic in Streams, which we can 
> consider doing when extending from one-operator-at-a-time translation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Matthias J. Sax
Sounds reasonable.

If we have multiple --topic arguments, it does also not matter if we use
t1:1,2 or t2=1,2

I just suggested '=' because I wanted use ':' to chain multiple topics.


-Matthias

On 2/23/17 10:49 AM, Jorge Esteban Quilcate Otoya wrote:
> Yeap, `--topic t1=1,2`LGTM
> 
> Don't have idea neither about getting rid of repeated --topic, but --group
> is also repeated in the case of deletion, so it could be ok to have
> repeated --topic arguments.
> 
> El jue., 23 feb. 2017 a las 19:14, Matthias J. Sax ()
> escribió:
> 
>> So you suggest to merge "scope options" --topics, --topic, and
>> --partitions into a single option? Sound good to me.
>>
>> I like the compact way to express it, ie, topicname:list-of-partitions
>> with "all partitions" if not partitions are specified. It's quite
>> intuitive to use.
>>
>> Just wondering, if we could get rid of the repeated --topic option; it's
>> somewhat verbose. Have no good idea though who to improve it.
>>
>> If you concatenate multiple topic, we need one more character that is
>> not allowed in topic names to separate the topics:
>>
>>> invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
>> '?', ' ', '\t', '\r', '\n', '='};
>>
>> maybe
>>
>> --topics t1=1,2,3:t2:t3=3
>>
>> use '=' to specify partitions (instead of ':' as you proposed) and ':'
>> to separate topics? All other characters seem to be worse to use to me.
>> But maybe you have a better idea.
>>
>>
>>
>> -Matthias
>>
>>
>> On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
>>> @Matthias about the point 9:
>>>
>>> What about keeping only the --topic option, and support this format:
>>>
>>> `--topic t1:0,1,2 --topic t2 --topic t3:2`
>>>
>>> In this case topics t1, t2, and t3 will be selected: topic t1 with
>>> partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
>> with
>>> only partition 2.
>>>
>>> Jorge.
>>>
>>> El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
>>> quilcate.jo...@gmail.com>) escribió:
>>>
 Thanks for the feedback Matthias.

 * 1. You're right. I'll reorder the scenarios.

 * 2. Agree. I'll update the KIP.

 * 3. I like it, updating to `reset-offsets`

 * 4. Agree, removing the `reset-` part

 * 5. Yes, 1.e option without --execute or --export will print out
>> current
 offset, and the new offset, that will be the same. The use-case of this
 option is to use it in combination with --export mostly and have a
>> current
 'checkpoint' to reset later. I will add to the KIP how the output should
 looks like.

 * 6. Considering 4., I will update it to `--to-offset`

 * 7. I like the idea to unify these options (plus, minus).
 `shift-offsets-by` is a good option, but I will like some more feedback
 here about the name. I will update the KIP in the meantime.

 * 8. Yes, discussed in 9.

 * 9. Agree. I'll love some feedback here. `topic` is already used by
 `delete`, and we can add `--all-topics` to consider all
>> topics/partitions
 assigned to a group. How could we define specific topics/partitions?

 * 10. Haven't thought about it, but make sense.
 ,, would be enough.

 * 11. Agree. Solved with 10.

 Also, I have a couple of changes to mention:

 1. I have add a reference to the branch where I'm working on this KIP.

 2. About the period scenario `--to-period`. I will change it to
 `--to-duration` given that duration (
 https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
 follows this format: 'PnDTnHnMnS' and does not consider daylight saving
 efects.



 El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
>> matth...@confluent.io>)
 escribió:

 Hi,

 thanks for updating the KIP. Couple of follow up comments:

 * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
 time" option -- IMHO it belongs to "reset by position"?


 * Nit: Description of "Reset to Earliest"

> using Kafka Consumer's `auto.offset.reset` to `earliest`

 I think this is strictly speaking not correct (as auto.offset.reset only
 triggered if no valid offset is found, but this tool explicitly modified
 committed offset), and should be phrased as

> using Kafka Consumer's #seekToBeginning()

 -> similar issue for description of "Reset to Latest"


 * Main option: rename to --reset-offsets (plural instead of singular)


 * Scenario Options: I would remove "reset" from all options, because the
 main argument "--reset-offset" says already what to do:

> bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX

 better (IMHO):

> bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX



 * Option 1.e ("print and export current offset") is not intuitive to use
 IMHO. The main option is "

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-23 Thread Jay Kreps
Hey Becket,

Yeah that makes sense.

I agree that you'd really have to both fix the estimation (i.e. make it per
topic or make it better estimate the high percentiles) AND have the
recovery mechanism. If you are underestimating often and then paying a high
recovery price that won't fly.

I think you take my main point though, which is just that I hate to exposes
these super low level options to users because it is so hard to explain to
people what it means and how they should set it. So if it is possible to
make either some combination of better estimation and splitting or better
tolerance of overage that would be preferrable.

-Jay

On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin  wrote:

> @Dong,
>
> Thanks for the comments. The default behavior of the producer won't change.
> If the users want to use the uncompressed message size, they probably will
> also bump up the batch size to somewhere close to the max message size.
> This would be in the document. BTW the default batch size is 16K which is
> pretty small.
>
> @Jay,
>
> Yeah, we actually had debated quite a bit internally what is the best
> solution to this.
>
> I completely agree it is a bug. In practice we usually leave some headroom
> to allow the compressed size to grow a little if the the original messages
> are not compressible, for example, 1000 KB instead of exactly 1 MB. It is
> likely safe enough.
>
> The major concern for the rejected alternative is performance. It largely
> depends on how frequent we need to split a batch, i.e. how likely the
> estimation can go off. If we only need to the split work occasionally, the
> cost would be amortized so we don't need to worry about it too much.
> However, it looks that for a producer with shared topics, the estimation is
> always off. As an example, consider two topics, one with compression ratio
> 0.6 the other 0.2, assuming exactly same traffic, the average compression
> ratio would be roughly 0.4, which is not right for either of the topics. So
> almost half of the batches (of the topics with 0.6 compression ratio) will
> end up larger than the configured batch size. When it comes to more topics
> such as mirror maker, this becomes more unpredictable. To avoid frequent
> rejection / split of the batches, we need to configured the batch size
> pretty conservatively. This could actually hurt the performance because we
> are shoehorn the messages that are highly compressible to a small batch so
> that the other topics that are not that compressible will not become too
> large with the same batch size. At LinkedIn, our batch size is configured
> to 64 KB because of this. I think we may actually have better batching if
> we just use the uncompressed message size and 800 KB batch size.
>
> We did not think about loosening the message size restriction, but that
> sounds a viable solution given that the consumer now can fetch oversized
> messages. One concern would be that on the broker side oversized messages
> will bring more memory pressure. With KIP-92, we may mitigate that, but the
> memory allocation for large messages may not be very GC friendly. I need to
> think about this a little more.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps  wrote:
>
> > Hey Becket,
> >
> > I get the problem we want to solve with this, but I don't think this is
> > something that makes sense as a user controlled knob that everyone
> sending
> > data to kafka has to think about. It is basically a bug, right?
> >
> > First, as a technical question is it true that using the uncompressed
> size
> > for batching actually guarantees that you observe the limit? I think that
> > implies that compression always makes the messages smaller, which i think
> > usually true but is not guaranteed, right? e.g. if someone encrypts their
> > data which tends to randomize it and then enables compressesion, it could
> > slightly get bigger?
> >
> > I also wonder if the rejected alternatives you describe couldn't be made
> to
> > work: basically try to be a bit better at estimation and recover when we
> > guess wrong. I don't think the memory usage should be a problem: isn't it
> > the same memory usage the consumer of that topic would need? And can't
> you
> > do the splitting and recompression in a streaming fashion? If we an make
> > the estimation rate low and the recovery cost is just ~2x the normal cost
> > for that batch that should be totally fine, right? (It's technically true
> > you might have to split more than once, but since you halve it each time
> I
> > think should you get a number of halvings that is logarithmic in the miss
> > size, which, with better estimation you'd hope would be super duper
> small).
> >
> > Alternatively maybe we could work on the other side of the problem and
> try
> > to make it so that a small miss on message size isn't a big problem. I
> > think original issue was that max size and fetch size were tightly
> coupled
> > and the way memory in the consumer

[GitHub] kafka pull request #2591: HOTFIX: use explicit version in upgrade.html

2017-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2591


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-23 Thread Dong Lin
Yeah, I agree it is a bit complex to do that approach for a one-time
migration. Probably not worth it. Here is another idea to reduce, but not
eliminate, the amount of message duplication during migration. I am fine
with not doing it. Just want to see the opinion from open source community.

The problem with current solution is that, when we toggle the zookeeper
path in order to migrate from MEZKCC, with 50% probability the old owner of
the partition may reduce notification later than the new partition owner.
Thus the new partition owner may reduce the offset committed by the older
owner 5 sec ago assuming the auto-commit interval is 10 sec. The messages
produced in this 5 sec window may be consumed multiple times. This amount
is even more if consumer is bootstrapping.

One way to mitigate this problem is for the MEZKCC to sleep for a
configurable amount of time after it receives zookeeper notification but
before it starts to fetch offset and consume message. This seems like an
easy change that allows user to tradeoff between the message duplication
and consumer downtime.



On Thu, Feb 23, 2017 at 11:20 AM, Joel Koshy  wrote:

> Regarding (2) - yes that's a good point. @Onur - I think the KIP should
> explicitly call this out.
> It is something that we did consider and decided against optimizing for.
> i.e., we just wrote that off as a minor caveat of the upgrade path in that
> there will be a few duplicates, but not too many given that we expect the
> period of duplicate ownership to be minimal. Although it could be addressed
> as you described, it does add complexity to an already-rather-complex
> migration path. Given that it is a transition state (i.e., migration) we
> felt it would be better and sufficient to keep it only as complex as it
> needs to be.
>
> On Mon, Feb 20, 2017 at 4:45 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com>
> wrote:
>
> > Regarding 1: We won't lose the offset from zookeeper upon partition
> > transfer from OZKCC/MDZKCC to MEZKCC because MEZKCC has
> > "dual.commit.enabled" set to true as well as "offsets.storage" set to
> > kafka. The combination of these configs results in the consumer fetching
> > offsets from both kafka and zookeeper and just picking the greater of the
> > two.
> >
> > On Mon, Feb 20, 2017 at 4:33 PM, Dong Lin  wrote:
> >
> > > Hey Onur,
> > >
> > > Thanks for the well-written KIP! I have two questions below.
> > >
> > > 1) In the process of migrating from OZKCCs and MDZKCCs to MEZKCCs, we
> > will
> > > may a mix of OZKCCs, MDZKCCs and MEZKCCs. OZKCC and MDZKCC will only
> > commit
> > > to zookeeper and MDZKCC will use kafka-based offset storage. Would we
> > lose
> > > offset committed to zookeeper by a MDZKCC if a partition ownership if
> > > transferred from a MDZKCC to a MEZKCC?
> > >
> > > 2) Suppose every process in the group is running MEZKCC. Each MEZKCC
> has
> > a
> > > zookeeper-based partition assignment and kafka-based partition
> > assignment.
> > > Is it guaranteed that these two assignments are exactly the same across
> > > processes? If not, say the zookeeper-based assignment assigns p1, p2 to
> > > process 1, and p3 to process 2. And kafka-based assignment assigns p1,
> p3
> > > to process 1, and p2 to process 2. Say process 1 handles receives the
> > > notification to switch to kafka-based notification before process 2, it
> > is
> > > possible that during a short period of time p3 will be consumed by both
> > > processes?
> > >
> > > This period is probably short and I am not sure how many messages may
> be
> > > duplicated as a result. But it seems possible to avoid this completely
> > > according to an idea that Becket suggested in a previous discussion.
> The
> > > znode /consumers//migration/mode can contain a sequence
> number
> > > that increment for each switch. Say the znode is toggled to kafka with
> > > sequence number 2, each MEZKCC will commit offset to with number 2 in
> the
> > > metadata for partitions that it currently owns according to the
> zk-based
> > > partition assignment, and then periodically fetches the committed
> offset
> > > and the metadata for the partitions that it should own according to the
> > > kafka-based partition assignment. Each MEZKCC only starts consumption
> > when
> > > the metadata has incremented to the number 2.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Feb 20, 2017 at 12:04 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com
> > > > wrote:
> > >
> > > > Hey everyone.
> > > >
> > > > I made a KIP that provides a mechanism for migrating from
> > > > ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism
> for
> > > > rolling back from KafkaConsumer to ZookeeperConsumerConnector:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%
> > > > 3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+
> and+Rollback
> > > >
> > > > Comments are welcome.
> > > >
> > > > - Onur
> > > >
> > >
> >
>


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881280#comment-15881280
 ] 

Damian Guy commented on KAFKA-4791:
---

[~mjsax] this is a bug. Bart is calling {{addStateStore}} which is, and will 
remain, a public API. He has just kindly pointed out where the problem lies.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-23 Thread Michael Pearce
So currently KIP has:

append
filter (get by key, but not inferring perf as noted expectation on O(1) perf 
with a get method)

I agree that supporting some form of

remove
replace

is standard in this kind of API and for the benefit of making a usable API, we 
should explore adding these.

If we’re avoid the put, get styled interface as there seems contention with 
this naming, we happy with naming these as I mentioned?


On 23/02/2017, 12:16, "radai"  wrote:

append-only would mean that if (for whatever reason) i want to replace a
header or strip it out i'd need to copy the whole record.



On Wed, Feb 22, 2017 at 5:09 PM, Michael Pearce 
wrote:

> Im happy to compromise to keep it mutable but move to an append style api.
> (as in guava interables concat)
>
> class Headers {
>Headers append(Iterable headers);
> }
>
>
> I don’t think we’d want prepend, this would give the idea of guaranteed
> ordering, when in actual fact we don’t provide that guarantee (.e.g one
> client can put headerA, then headerB, but another could put headerB then
> headerA, this shouldn’t cause issues), Also what if we changed to a 
hashmap
> for the internal implementation, its just a bucket of entries no ordering.
> I think we just need to provide an api to add/append headers.
>
> This ok? If so ill update KIP to record this.
>
> Cheers
> Mike
>
> On 23/02/2017, 00:37, "Jason Gustafson"  wrote:
>
> The point about usability is fair. It's also reasonable to expect that
> common use cases such as appending headers should be done efficiently.
>
> Perhaps we could compromise with something like this?
>
> class Headers {
>  Headers append(Iterable headers);
>  Headers prepend(Iterable headers);
> }
>
> That retains ease of use while still giving ourselves some flexibility
> in
> the implementation.
>
> -Jason
>
>
> On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce  >
> wrote:
>
> > I wasn’t referring to the headers needing to be copied, im meaning
> the
> > fact we’d be forcing a new producer record to be created, with all
> the
> > contents copied.
> >
> > i.e what will happen is utility method will be created or end up
> being
> > used, which does this, and returns the new ProducerRecord instance.
> >
> > ProducerRecord  addHeader(ProducerRecord record, Header header){
> > Return New ProducerRecord(record.key, record.value,
> record.timestamp…..,
> > record.headers.concat(header))
> > }
> >
> > To me this seems ugly, but will be inevitable if we don’t make 
adding
> > headers to existing records a simple clean method call.
> >
> >
> >
> > On 22/02/2017, 22:57, "Michael Pearce" 
> wrote:
> >
> > Lazy init can achieve/avoid that.
> >
> > Re the concat, why don’t we implement that inside the Headers
> rather
> > than causing everyone to implement this as adding headers in
> interceptors
> > will be a dominant use case. We want a user friendly API. Having as
> a user
> > having to code this instead of having the headers handle this for me
> seems
> > redundant.
> >
> > On 22/02/2017, 22:34, "Jason Gustafson" 
> wrote:
> >
> > I thought the argument was against creating the extra 
objects
> > unnecessarily
> > (i.e. if they were not accessed). And note that making the
> Headers
> > immutable doesn't necessarily mean that they need to be
> copied:
> > you can do
> > a trick like Guava's Iterables.concat to add additional
> headers
> > without
> > changing the underlying collections.
> >
> > -Jason
> >
> > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > wrote:
> >
> > > If the argument for not having a map holding the key, 
value
> > pairs is due
> > > to garbage creation of HashMap entry's, forcing the
> creation of
> > a whole new
> > > producer record to simply add a head, surely is creating
> a-lot
> > more?
> > > 
> > > From: Jason Gustafson 
> > > Sent: Wednesday, February 22, 2017 10:09 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> > >
> > > The current producer interceptor API is this:
> > >
> 

[jira] [Created] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2017-02-23 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-4793:
---

 Summary: Kafka Connect: POST /connectors/(string: name)/restart 
doesn't start failed tasks
 Key: KAFKA-4793
 URL: https://issues.apache.org/jira/browse/KAFKA-4793
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira


Sometimes tasks stop due to repeated failures. Users will want to restart the 
connector and have it retry after fixing an issue. 

We expected "POST /connectors/(string: name)/restart" to cause retry of failed 
tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread radai
@jun: i wasnt concerned about tying up a request processing thread, but
IIUC the code does still read the entire request out, which might add-up to
a non-negligible amount of memory.

On Thu, Feb 23, 2017 at 11:55 AM, Dong Lin  wrote:

> Hey Rajini,
>
> The current KIP says that the maximum delay will be reduced to window size
> if it is larger than the window size. I have a concern with this:
>
> 1) This essentially means that the user is allowed to exceed their quota
> over a long period of time. Can you provide an upper bound on this
> deviation?
>
> 2) What is the motivation for cap the maximum delay by the window size? I
> am wondering if there is better alternative to address the problem.
>
> 3) It means that the existing metric-related config will have a more
> directly impact on the mechanism of this io-thread-unit-based quota. The
> may be an important change depending on the answer to 1) above. We probably
> need to document this more explicitly.
>
> Dong
>
>
> On Thu, Feb 23, 2017 at 10:56 AM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Yeah you are right. I thought it wasn't because at LinkedIn it will be
> too
> > much pressure on inGraph to expose those per-clientId metrics so we ended
> > up printing them periodically to local log. Never mind if it is not a
> > general problem.
> >
> > Hey Rajini,
> >
> > - I agree with Jay that we probably don't want to add a new field for
> > every quota ProduceResponse or FetchResponse. Is there any use-case for
> > having separate throttle-time fields for byte-rate-quota and
> > io-thread-unit-quota? You probably need to document this as interface
> > change if you plan to add new field in any request.
> >
> > - I don't think IOThread belongs to quotaType. The existing quota types
> > (i.e. Produce/Fetch/LeaderReplication/FollowerReplication) identify the
> > type of request that are throttled, not the quota mechanism that is
> applied.
> >
> > - If a request is throttled due to this io-thread-unit-based quota, is
> the
> > existing queue-size metric in ClientQuotaManager incremented?
> >
> > - In the interest of providing guide line for admin to decide
> > io-thread-unit-based quota and for user to understand its impact on their
> > traffic, would it be useful to have a metric that shows the overall
> > byte-rate per io-thread-unit? Can we also show this a per-clientId
> metric?
> >
> > Thanks,
> > Dong
> >
> >
> > On Thu, Feb 23, 2017 at 9:25 AM, Jun Rao  wrote:
> >
> >> Hi, Ismael,
> >>
> >> For #3, typically, an admin won't configure more io threads than CPU
> >> cores,
> >> but it's possible for an admin to start with fewer io threads than cores
> >> and grow that later on.
> >>
> >> Hi, Dong,
> >>
> >> I think the throttleTime sensor on the broker tells the admin whether a
> >> user/clentId is throttled or not.
> >>
> >> Hi, Radi,
> >>
> >> The reasoning for delaying the throttled requests on the broker instead
> of
> >> returning an error immediately is that the latter has no way to prevent
> >> the
> >> client from retrying immediately, which will make things worse. The
> >> delaying logic is based off a delay queue. A separate expiration thread
> >> just waits on the next to be expired request. So, it doesn't tie up a
> >> request handler thread.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Thu, Feb 23, 2017 at 9:07 AM, Ismael Juma  wrote:
> >>
> >> > Hi Jay,
> >> >
> >> > Regarding 1, I definitely like the simplicity of keeping a single
> >> throttle
> >> > time field in the response. The downside is that the client metrics
> >> will be
> >> > more coarse grained.
> >> >
> >> > Regarding 3, we have `leader.imbalance.per.broker.percentage` and
> >> > `log.cleaner.min.cleanable.ratio`.
> >> >
> >> > Ismael
> >> >
> >> > On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:
> >> >
> >> > > A few minor comments:
> >> > >
> >> > >1. Isn't it the case that the throttling time response field
> should
> >> > have
> >> > >the total time your request was throttled irrespective of the
> >> quotas
> >> > > that
> >> > >caused that. Limiting it to byte rate quota doesn't make sense,
> >> but I
> >> > > also
> >> > >I don't think we want to end up adding new fields in the response
> >> for
> >> > > every
> >> > >single thing we quota, right?
> >> > >2. I don't think we should make this quota specifically about io
> >> > >threads. Once we introduce these quotas people set them and
> expect
> >> > them
> >> > > to
> >> > >be enforced (and if they aren't it may cause an outage). As a
> >> result
> >> > > they
> >> > >are a bit more sensitive than normal configs, I think. The
> current
> >> > > thread
> >> > >pools seem like something of an implementation detail and not the
> >> > level
> >> > > the
> >> > >user-facing quotas should be involved with. I think it might be
> >> better
> >> > > to
> >> > >make this a general request-time throttle with no mention in the
> >> > naming
> >> > >about I/O threads and simply ac

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Dong Lin
Hey Rajini,

The current KIP says that the maximum delay will be reduced to window size
if it is larger than the window size. I have a concern with this:

1) This essentially means that the user is allowed to exceed their quota
over a long period of time. Can you provide an upper bound on this
deviation?

2) What is the motivation for cap the maximum delay by the window size? I
am wondering if there is better alternative to address the problem.

3) It means that the existing metric-related config will have a more
directly impact on the mechanism of this io-thread-unit-based quota. The
may be an important change depending on the answer to 1) above. We probably
need to document this more explicitly.

Dong


On Thu, Feb 23, 2017 at 10:56 AM, Dong Lin  wrote:

> Hey Jun,
>
> Yeah you are right. I thought it wasn't because at LinkedIn it will be too
> much pressure on inGraph to expose those per-clientId metrics so we ended
> up printing them periodically to local log. Never mind if it is not a
> general problem.
>
> Hey Rajini,
>
> - I agree with Jay that we probably don't want to add a new field for
> every quota ProduceResponse or FetchResponse. Is there any use-case for
> having separate throttle-time fields for byte-rate-quota and
> io-thread-unit-quota? You probably need to document this as interface
> change if you plan to add new field in any request.
>
> - I don't think IOThread belongs to quotaType. The existing quota types
> (i.e. Produce/Fetch/LeaderReplication/FollowerReplication) identify the
> type of request that are throttled, not the quota mechanism that is applied.
>
> - If a request is throttled due to this io-thread-unit-based quota, is the
> existing queue-size metric in ClientQuotaManager incremented?
>
> - In the interest of providing guide line for admin to decide
> io-thread-unit-based quota and for user to understand its impact on their
> traffic, would it be useful to have a metric that shows the overall
> byte-rate per io-thread-unit? Can we also show this a per-clientId metric?
>
> Thanks,
> Dong
>
>
> On Thu, Feb 23, 2017 at 9:25 AM, Jun Rao  wrote:
>
>> Hi, Ismael,
>>
>> For #3, typically, an admin won't configure more io threads than CPU
>> cores,
>> but it's possible for an admin to start with fewer io threads than cores
>> and grow that later on.
>>
>> Hi, Dong,
>>
>> I think the throttleTime sensor on the broker tells the admin whether a
>> user/clentId is throttled or not.
>>
>> Hi, Radi,
>>
>> The reasoning for delaying the throttled requests on the broker instead of
>> returning an error immediately is that the latter has no way to prevent
>> the
>> client from retrying immediately, which will make things worse. The
>> delaying logic is based off a delay queue. A separate expiration thread
>> just waits on the next to be expired request. So, it doesn't tie up a
>> request handler thread.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Feb 23, 2017 at 9:07 AM, Ismael Juma  wrote:
>>
>> > Hi Jay,
>> >
>> > Regarding 1, I definitely like the simplicity of keeping a single
>> throttle
>> > time field in the response. The downside is that the client metrics
>> will be
>> > more coarse grained.
>> >
>> > Regarding 3, we have `leader.imbalance.per.broker.percentage` and
>> > `log.cleaner.min.cleanable.ratio`.
>> >
>> > Ismael
>> >
>> > On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:
>> >
>> > > A few minor comments:
>> > >
>> > >1. Isn't it the case that the throttling time response field should
>> > have
>> > >the total time your request was throttled irrespective of the
>> quotas
>> > > that
>> > >caused that. Limiting it to byte rate quota doesn't make sense,
>> but I
>> > > also
>> > >I don't think we want to end up adding new fields in the response
>> for
>> > > every
>> > >single thing we quota, right?
>> > >2. I don't think we should make this quota specifically about io
>> > >threads. Once we introduce these quotas people set them and expect
>> > them
>> > > to
>> > >be enforced (and if they aren't it may cause an outage). As a
>> result
>> > > they
>> > >are a bit more sensitive than normal configs, I think. The current
>> > > thread
>> > >pools seem like something of an implementation detail and not the
>> > level
>> > > the
>> > >user-facing quotas should be involved with. I think it might be
>> better
>> > > to
>> > >make this a general request-time throttle with no mention in the
>> > naming
>> > >about I/O threads and simply acknowledge the current limitation
>> (which
>> > > we
>> > >may someday fix) in the docs that this covers only the time after
>> the
>> > >thread is read off the network.
>> > >3. As such I think the right interface to the user would be
>> something
>> > >like percent_request_time and be in {0,...100} or
>> request_time_ratio
>> > > and be
>> > >in {0.0,...,1.0} (I think "ratio" is the terminology we used if the
>> > > scale
>> > >is between 0 and 1 in the other metrics, right

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-23 Thread Becket Qin
@Dong,

Thanks for the comments. The default behavior of the producer won't change.
If the users want to use the uncompressed message size, they probably will
also bump up the batch size to somewhere close to the max message size.
This would be in the document. BTW the default batch size is 16K which is
pretty small.

@Jay,

Yeah, we actually had debated quite a bit internally what is the best
solution to this.

I completely agree it is a bug. In practice we usually leave some headroom
to allow the compressed size to grow a little if the the original messages
are not compressible, for example, 1000 KB instead of exactly 1 MB. It is
likely safe enough.

The major concern for the rejected alternative is performance. It largely
depends on how frequent we need to split a batch, i.e. how likely the
estimation can go off. If we only need to the split work occasionally, the
cost would be amortized so we don't need to worry about it too much.
However, it looks that for a producer with shared topics, the estimation is
always off. As an example, consider two topics, one with compression ratio
0.6 the other 0.2, assuming exactly same traffic, the average compression
ratio would be roughly 0.4, which is not right for either of the topics. So
almost half of the batches (of the topics with 0.6 compression ratio) will
end up larger than the configured batch size. When it comes to more topics
such as mirror maker, this becomes more unpredictable. To avoid frequent
rejection / split of the batches, we need to configured the batch size
pretty conservatively. This could actually hurt the performance because we
are shoehorn the messages that are highly compressible to a small batch so
that the other topics that are not that compressible will not become too
large with the same batch size. At LinkedIn, our batch size is configured
to 64 KB because of this. I think we may actually have better batching if
we just use the uncompressed message size and 800 KB batch size.

We did not think about loosening the message size restriction, but that
sounds a viable solution given that the consumer now can fetch oversized
messages. One concern would be that on the broker side oversized messages
will bring more memory pressure. With KIP-92, we may mitigate that, but the
memory allocation for large messages may not be very GC friendly. I need to
think about this a little more.

Thanks,

Jiangjie (Becket) Qin


On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps  wrote:

> Hey Becket,
>
> I get the problem we want to solve with this, but I don't think this is
> something that makes sense as a user controlled knob that everyone sending
> data to kafka has to think about. It is basically a bug, right?
>
> First, as a technical question is it true that using the uncompressed size
> for batching actually guarantees that you observe the limit? I think that
> implies that compression always makes the messages smaller, which i think
> usually true but is not guaranteed, right? e.g. if someone encrypts their
> data which tends to randomize it and then enables compressesion, it could
> slightly get bigger?
>
> I also wonder if the rejected alternatives you describe couldn't be made to
> work: basically try to be a bit better at estimation and recover when we
> guess wrong. I don't think the memory usage should be a problem: isn't it
> the same memory usage the consumer of that topic would need? And can't you
> do the splitting and recompression in a streaming fashion? If we an make
> the estimation rate low and the recovery cost is just ~2x the normal cost
> for that batch that should be totally fine, right? (It's technically true
> you might have to split more than once, but since you halve it each time I
> think should you get a number of halvings that is logarithmic in the miss
> size, which, with better estimation you'd hope would be super duper small).
>
> Alternatively maybe we could work on the other side of the problem and try
> to make it so that a small miss on message size isn't a big problem. I
> think original issue was that max size and fetch size were tightly coupled
> and the way memory in the consumer worked you really wanted fetch size to
> be as small as possible because you'd use that much memory per fetched
> partition and the consumer would get stuck if its fetch size wasn't big
> enough. I think we made some progress on that issue and maybe more could be
> done there so that a small bit of fuzziness around the size would not be an
> issue?
>
> -Jay
>
>
>
> On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin  wrote:
>
> > Hi folks,
> >
> > I would like to start the discussion thread on KIP-126. The KIP propose
> > adding a new configuration to KafkaProducer to allow batching based on
> > uncompressed message size.
> >
> > Comments are welcome.
> >
> > The KIP wiki is following:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+size
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin

[GitHub] kafka pull request #2592: MINOR: fixed javadoc typo in KafkaProducer::partit...

2017-02-23 Thread jpdaigle
GitHub user jpdaigle opened a pull request:

https://github.com/apache/kafka/pull/2592

MINOR: fixed javadoc typo in KafkaProducer::partitionsFor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tripadvisor/kafka minor_typo_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2592.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2592


commit dd20d58f562e5d2d09d750b2ad96798d469fb39b
Author: Jean-Philippe Daigle 
Date:   2017-02-23T19:43:05Z

minor: fixed a typo in KafkaProducer




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-23 Thread Joel Koshy
Regarding (2) - yes that's a good point. @Onur - I think the KIP should
explicitly call this out.
It is something that we did consider and decided against optimizing for.
i.e., we just wrote that off as a minor caveat of the upgrade path in that
there will be a few duplicates, but not too many given that we expect the
period of duplicate ownership to be minimal. Although it could be addressed
as you described, it does add complexity to an already-rather-complex
migration path. Given that it is a transition state (i.e., migration) we
felt it would be better and sufficient to keep it only as complex as it
needs to be.

On Mon, Feb 20, 2017 at 4:45 PM, Onur Karaman 
wrote:

> Regarding 1: We won't lose the offset from zookeeper upon partition
> transfer from OZKCC/MDZKCC to MEZKCC because MEZKCC has
> "dual.commit.enabled" set to true as well as "offsets.storage" set to
> kafka. The combination of these configs results in the consumer fetching
> offsets from both kafka and zookeeper and just picking the greater of the
> two.
>
> On Mon, Feb 20, 2017 at 4:33 PM, Dong Lin  wrote:
>
> > Hey Onur,
> >
> > Thanks for the well-written KIP! I have two questions below.
> >
> > 1) In the process of migrating from OZKCCs and MDZKCCs to MEZKCCs, we
> will
> > may a mix of OZKCCs, MDZKCCs and MEZKCCs. OZKCC and MDZKCC will only
> commit
> > to zookeeper and MDZKCC will use kafka-based offset storage. Would we
> lose
> > offset committed to zookeeper by a MDZKCC if a partition ownership if
> > transferred from a MDZKCC to a MEZKCC?
> >
> > 2) Suppose every process in the group is running MEZKCC. Each MEZKCC has
> a
> > zookeeper-based partition assignment and kafka-based partition
> assignment.
> > Is it guaranteed that these two assignments are exactly the same across
> > processes? If not, say the zookeeper-based assignment assigns p1, p2 to
> > process 1, and p3 to process 2. And kafka-based assignment assigns p1, p3
> > to process 1, and p2 to process 2. Say process 1 handles receives the
> > notification to switch to kafka-based notification before process 2, it
> is
> > possible that during a short period of time p3 will be consumed by both
> > processes?
> >
> > This period is probably short and I am not sure how many messages may be
> > duplicated as a result. But it seems possible to avoid this completely
> > according to an idea that Becket suggested in a previous discussion. The
> > znode /consumers//migration/mode can contain a sequence number
> > that increment for each switch. Say the znode is toggled to kafka with
> > sequence number 2, each MEZKCC will commit offset to with number 2 in the
> > metadata for partitions that it currently owns according to the zk-based
> > partition assignment, and then periodically fetches the committed offset
> > and the metadata for the partitions that it should own according to the
> > kafka-based partition assignment. Each MEZKCC only starts consumption
> when
> > the metadata has incremented to the number 2.
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Feb 20, 2017 at 12:04 PM, Onur Karaman <
> > onurkaraman.apa...@gmail.com
> > > wrote:
> >
> > > Hey everyone.
> > >
> > > I made a KIP that provides a mechanism for migrating from
> > > ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for
> > > rolling back from KafkaConsumer to ZookeeperConsumerConnector:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%
> > > 3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+and+Rollback
> > >
> > > Comments are welcome.
> > >
> > > - Onur
> > >
> >
>


[jira] [Comment Edited] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881043#comment-15881043
 ] 

Bart Vercammen edited comment on KAFKA-4791 at 2/23/17 7:18 PM:


This then means that {{addStateStore}} should not be used from TopologyBuilder ?
So, in order to connect my stores to the processors I'll need to create them 
inside the processor's {{init}} block and register them manually to the 
processor-context?
Or am I seeing this wrong?

As I use the store as a {{QueryableStore}} later on, I do need to fetch it from 
the topology builder, so it needs to be registered there with {{adStateStore}}, 
no?


was (Author: cloutrix):
This then means that {{addStateStore}} should not be used from TopologyBuilder ?
So, in order to connect my stores to the processors I'll need to create them 
inside the processor's {{init}} block and register them manually to the 
processor-context?
Or am I seeing this wrong?

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4792) Kafka Connect: Add ByteArray Converter

2017-02-23 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-4792:
---

 Summary: Kafka Connect: Add ByteArray Converter
 Key: KAFKA-4792
 URL: https://issues.apache.org/jira/browse/KAFKA-4792
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Gwen Shapira


We currently have JSON and String converters.
Some data sources have non-string data (like database BLOBs) and some Kafka 
have binary data they want to land in binary format in the target system.

We need a non-converting converter that will allow just dumping bytes. 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL

2017-02-23 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881030#comment-15881030
 ] 

Michal Borowiecki commented on KAFKA-4601:
--

Don't know if this belongs in this ticket or warrants a separate one, but I'd 
suggest, instead of trying to rely on kstreams doing more automatic 
optimization, it would be good to provide users more control over the 
repartitioning. 
My use case is as follows (unrelated bits omitted for brevity):
{code}
KTable loggedInCustomers = builder
.stream("customerLogins")
.groupBy((key, activity) -> 
activity.getCustomerRef())
.reduce((first,second) -> second, loginStore());

builder
.stream("balanceUpdates")
.map((key, activity) -> new KeyValue<>(
activity.getCustomerRef(),
activity))
.join(loggedInCustomers, (activity, session) -> ...
.to("sessions");
{code}
Both "groupBy" and "map" in the underlying implementation set the 
repartitionRequired flag (since the key changes), and the aggregation/join that 
follows will create the repartitioned topic.
However, in our case I know that both input streams are already partitioned by 
the customerRef value, which I'm mapping into the key (because it's required by 
the join operation).
So there are 2 unnecessary intermediate topics created with their associated 
overhead, while the ultimate goal is simply to do a join on a value that we 
already use to partition the original streams anyway.
(Note, we don't have the option to re-implement the original input streams to 
make customerRef the message key.)

I think it would be better to allow the user to decide (from their knowledge of 
the incoming streams) whether a repartition is mandatory on aggregation and 
join operations (overloaded version of the methods with the repartitionRequired 
flag exposed maybe?)
An alternative would be to allow users to perform a join on a value other than 
the key (a keyValueMapper parameter to join, like the one used for joins with 
global tables), but I expect that to be more involved and error-prone to use 
for people who don't understand the partitioning requirements well (whereas 
it's safe for global tables).


> Avoid duplicated repartitioning in KStream DSL
> --
>
> Key: KAFKA-4601
> URL: https://issues.apache.org/jira/browse/KAFKA-4601
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: performance
>
> Consider the following DSL:
> {code}
> Stream source = builder.stream(Serdes.String(), 
> Serdes.String(), "topic1").map(..);
> KTable counts = source
> .groupByKey()
> .count("Counts");
> KStream sink = source.leftJoin(counts, ..);
> {code}
> The resulted topology looks like this:
> {code}
> ProcessorTopology:
>   KSTREAM-SOURCE-00:
>   topics: [topic1]
>   children:   [KSTREAM-MAP-01]
>   KSTREAM-MAP-01:
>   children:   
> [KSTREAM-FILTER-04, KSTREAM-FILTER-07]
>   KSTREAM-FILTER-04:
>   children:   
> [KSTREAM-SINK-03]
>   KSTREAM-SINK-03:
>   topic:  X-Counts-repartition
>   KSTREAM-FILTER-07:
>   children:   
> [KSTREAM-SINK-06]
>   KSTREAM-SINK-06:
>   topic:  
> X-KSTREAM-MAP-01-repartition
> ProcessorTopology:
>   KSTREAM-SOURCE-08:
>   topics: 
> [X-KSTREAM-MAP-01-repartition]
>   children:   
> [KSTREAM-LEFTJOIN-09]
>   KSTREAM-LEFTJOIN-09:
>   states: [Counts]
>   KSTREAM-SOURCE-05:
>   topics: [X-Counts-repartition]
>   children:   
> [KSTREAM-AGGREGATE-02]
>   KSTREAM-AGGREGATE-02:
>   states: [Counts]
> {code}
> I.e. there are two 

[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881043#comment-15881043
 ] 

Bart Vercammen commented on KAFKA-4791:
---

This then means that {{addStateStore}} should not be used from TopologyBuilder ?
So, in order to connect my stores to the processors I'll need to create them 
inside the processor's {{init}} block and register them manually to the 
processor-context?
Or am I seeing this wrong?

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-23 Thread Dong Lin
+1 (non-binding)

On Wed, Feb 22, 2017 at 10:52 PM, Manikumar 
wrote:

> +1 (non-binding)
>
> On Thu, Feb 23, 2017 at 3:27 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Thanks a lot for the comments and reviews.
> > I agree we should log the username.
> > What I meant by creating KafkaPrincipal was, after this KIP we would not
> be
> > required to create KafkaPrincipal and if we want to maintain the old
> > logging, we will have to create it as we do today.
> > I will take care that we specify the Principal name in the log.
> >
> > Thanks again for all the reviews.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Wed, Feb 22, 2017 at 1:45 PM, Jun Rao  wrote:
> >
> > > Hi, Mayuresh,
> > >
> > > For logging the user name, we could do either way. We just need to make
> > > sure the expected user name is logged. Also, currently, we are already
> > > creating a KafkaPrincipal on every request. +1 on the latest KIP.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Feb 21, 2017 at 8:05 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com
> > > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > I will mention in the KIP : how this change doesn't affect the
> default
> > > > authorizer implementation.
> > > >
> > > > Regarding, Currently, we log the principal name in the request log in
> > > > RequestChannel, which has the format of "principalType + SEPARATOR +
> > > > name;".
> > > > It would be good if we can keep the same convention after this KIP.
> One
> > > way
> > > > to do that is to convert java.security.Principal to KafkaPrincipal
> for
> > > > logging the requests.
> > > > --- > This would mean we have to create a new KafkaPrincipal on each
> > > > request. Would it be OK to just specify the name of the principal.
> > > > Is there any major reason, we don't want to change the logging
> format?
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > >
> > > >
> > > > On Mon, Feb 20, 2017 at 10:18 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Mayuresh,
> > > > >
> > > > > Thanks for the updated KIP. A couple of more comments.
> > > > >
> > > > > 1. Do we convert java.security.Principal to KafkaPrincipal for
> > > > > authorization check in SimpleAclAuthorizer? If so, it would be
> useful
> > > to
> > > > > mention that in the wiki so that people can understand how this
> > change
> > > > > doesn't affect the default authorizer implementation.
> > > > >
> > > > > 2. Currently, we log the principal name in the request log in
> > > > > RequestChannel, which has the format of "principalType + SEPARATOR
> +
> > > > > name;".
> > > > > It would be good if we can keep the same convention after this KIP.
> > One
> > > > way
> > > > > to do that is to convert java.security.Principal to KafkaPrincipal
> > for
> > > > > logging the requests.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Feb 17, 2017 at 5:35 PM, Mayuresh Gharat <
> > > > > gharatmayures...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > I have updated the KIP. Would you mind taking another look?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Mayuresh
> > > > > >
> > > > > > On Fri, Feb 17, 2017 at 4:42 PM, Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Sure sounds good to me.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Fri, Feb 17, 2017 at 1:54 PM, Jun Rao 
> > wrote:
> > > > > > >
> > > > > > >> Hi, Mani,
> > > > > > >>
> > > > > > >> Good point on using PrincipalBuilder for SASL. It seems that
> > > > > > >> PrincipalBuilder already has access to Authenticator. So, we
> > could
> > > > > just
> > > > > > >> enable that in SaslChannelBuilder. We probably could do that
> in
> > a
> > > > > > separate
> > > > > > >> KIP?
> > > > > > >>
> > > > > > >> Hi, Mayuresh,
> > > > > > >>
> > > > > > >> If you don't think there is a concrete use case for using
> > > > > > >> PrincipalBuilder in
> > > > > > >> kafka-acls.sh, perhaps we could do the simpler approach for
> now?
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jun
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Fri, Feb 17, 2017 at 12:23 PM, Mayuresh Gharat <
> > > > > > >> gharatmayures...@gmail.com> wrote:
> > > > > > >>
> > > > > > >> > @Manikumar,
> > > > > > >> >
> > > > > > >> > Can you give an example how you are planning to use
> > > > > PrincipalBuilder?
> > > > > > >> >
> > > > > > >> > @Jun
> > > > > > >> > Yes, that is right. To give a brief overview, we just
> extract
> > > the
> > > > > cert
> > > > > > >> and
> > > > > > >> > hand it over to a third party library for creating a
> > Principal.
> > > So
> > > > > we
> > > > > > >> > cannot create a Principal from just a string.
> > > > > > >> > The main motive behind adding the PrincipalBuilder for
> > > > k

[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881018#comment-15881018
 ] 

Bill Bejeck commented on KAFKA-4791:


Fair enough.  I did not look into the issue at all, at first blush it seemed 
like a big that needed to be fixed asap.  But considering your comments and the 
forthcoming changes with KIP-120, I'll hold off.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2591: HOTFIX: use explicit version in upgrade.html

2017-02-23 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2591

HOTFIX: use explicit version in upgrade.html



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KHotfix-explicit-version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2591.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2591


commit 38b85c7216d66d4cf1cc1d0d4485c78eed4569d3
Author: Guozhang Wang 
Date:   2017-02-23T19:06:56Z

use explicit version in upgrade.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881018#comment-15881018
 ] 

Bill Bejeck edited comment on KAFKA-4791 at 2/23/17 6:56 PM:
-

Fair enough.  I did not look into the issue at all, at first blush it seemed 
like a bug that needed to be fixed asap.  But considering your comments and the 
forthcoming changes with KIP-120, I'll hold off.


was (Author: bbejeck):
Fair enough.  I did not look into the issue at all, at first blush it seemed 
like a big that needed to be fixed asap.  But considering your comments and the 
forthcoming changes with KIP-120, I'll hold off.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Dong Lin
Hey Jun,

Yeah you are right. I thought it wasn't because at LinkedIn it will be too
much pressure on inGraph to expose those per-clientId metrics so we ended
up printing them periodically to local log. Never mind if it is not a
general problem.

Hey Rajini,

- I agree with Jay that we probably don't want to add a new field for every
quota ProduceResponse or FetchResponse. Is there any use-case for having
separate throttle-time fields for byte-rate-quota and io-thread-unit-quota?
You probably need to document this as interface change if you plan to add
new field in any request.

- I don't think IOThread belongs to quotaType. The existing quota types
(i.e. Produce/Fetch/LeaderReplication/FollowerReplication) identify the
type of request that are throttled, not the quota mechanism that is applied.

- If a request is throttled due to this io-thread-unit-based quota, is the
existing queue-size metric in ClientQuotaManager incremented?

- In the interest of providing guide line for admin to decide
io-thread-unit-based quota and for user to understand its impact on their
traffic, would it be useful to have a metric that shows the overall
byte-rate per io-thread-unit? Can we also show this a per-clientId metric?

Thanks,
Dong


On Thu, Feb 23, 2017 at 9:25 AM, Jun Rao  wrote:

> Hi, Ismael,
>
> For #3, typically, an admin won't configure more io threads than CPU cores,
> but it's possible for an admin to start with fewer io threads than cores
> and grow that later on.
>
> Hi, Dong,
>
> I think the throttleTime sensor on the broker tells the admin whether a
> user/clentId is throttled or not.
>
> Hi, Radi,
>
> The reasoning for delaying the throttled requests on the broker instead of
> returning an error immediately is that the latter has no way to prevent the
> client from retrying immediately, which will make things worse. The
> delaying logic is based off a delay queue. A separate expiration thread
> just waits on the next to be expired request. So, it doesn't tie up a
> request handler thread.
>
> Thanks,
>
> Jun
>
> On Thu, Feb 23, 2017 at 9:07 AM, Ismael Juma  wrote:
>
> > Hi Jay,
> >
> > Regarding 1, I definitely like the simplicity of keeping a single
> throttle
> > time field in the response. The downside is that the client metrics will
> be
> > more coarse grained.
> >
> > Regarding 3, we have `leader.imbalance.per.broker.percentage` and
> > `log.cleaner.min.cleanable.ratio`.
> >
> > Ismael
> >
> > On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:
> >
> > > A few minor comments:
> > >
> > >1. Isn't it the case that the throttling time response field should
> > have
> > >the total time your request was throttled irrespective of the quotas
> > > that
> > >caused that. Limiting it to byte rate quota doesn't make sense, but
> I
> > > also
> > >I don't think we want to end up adding new fields in the response
> for
> > > every
> > >single thing we quota, right?
> > >2. I don't think we should make this quota specifically about io
> > >threads. Once we introduce these quotas people set them and expect
> > them
> > > to
> > >be enforced (and if they aren't it may cause an outage). As a result
> > > they
> > >are a bit more sensitive than normal configs, I think. The current
> > > thread
> > >pools seem like something of an implementation detail and not the
> > level
> > > the
> > >user-facing quotas should be involved with. I think it might be
> better
> > > to
> > >make this a general request-time throttle with no mention in the
> > naming
> > >about I/O threads and simply acknowledge the current limitation
> (which
> > > we
> > >may someday fix) in the docs that this covers only the time after
> the
> > >thread is read off the network.
> > >3. As such I think the right interface to the user would be
> something
> > >like percent_request_time and be in {0,...100} or request_time_ratio
> > > and be
> > >in {0.0,...,1.0} (I think "ratio" is the terminology we used if the
> > > scale
> > >is between 0 and 1 in the other metrics, right?)
> > >
> > > -Jay
> > >
> > > On Thu, Feb 23, 2017 at 3:45 AM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Guozhang/Dong,
> > > >
> > > > Thank you for the feedback.
> > > >
> > > > Guozhang : I have updated the section on co-existence of byte rate
> and
> > > > request time quotas.
> > > >
> > > > Dong: I hadn't added much detail to the metrics and sensors since
> they
> > > are
> > > > going to be very similar to the existing metrics and sensors. To
> avoid
> > > > confusion, I have now added more detail. All metrics are in the group
> > > > "quotaType" and all sensors have names starting with "quotaType"
> (where
> > > > quotaType is Produce/Fetch/LeaderReplication/
> > > > FollowerReplication/*IOThread*).
> > > > So there will be no reuse of existing metrics/sensors. The new ones
> for
> > > > request processing time based throttling will be completely
> independent
> > 

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Jorge Esteban Quilcate Otoya
Yeap, `--topic t1=1,2`LGTM

Don't have idea neither about getting rid of repeated --topic, but --group
is also repeated in the case of deletion, so it could be ok to have
repeated --topic arguments.

El jue., 23 feb. 2017 a las 19:14, Matthias J. Sax ()
escribió:

> So you suggest to merge "scope options" --topics, --topic, and
> --partitions into a single option? Sound good to me.
>
> I like the compact way to express it, ie, topicname:list-of-partitions
> with "all partitions" if not partitions are specified. It's quite
> intuitive to use.
>
> Just wondering, if we could get rid of the repeated --topic option; it's
> somewhat verbose. Have no good idea though who to improve it.
>
> If you concatenate multiple topic, we need one more character that is
> not allowed in topic names to separate the topics:
>
> > invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
> '?', ' ', '\t', '\r', '\n', '='};
>
> maybe
>
> --topics t1=1,2,3:t2:t3=3
>
> use '=' to specify partitions (instead of ':' as you proposed) and ':'
> to separate topics? All other characters seem to be worse to use to me.
> But maybe you have a better idea.
>
>
>
> -Matthias
>
>
> On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> > @Matthias about the point 9:
> >
> > What about keeping only the --topic option, and support this format:
> >
> > `--topic t1:0,1,2 --topic t2 --topic t3:2`
> >
> > In this case topics t1, t2, and t3 will be selected: topic t1 with
> > partitions 0,1 and 2; topic t2 with all its partitions; and topic t3,
> with
> > only partition 2.
> >
> > Jorge.
> >
> > El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> > quilcate.jo...@gmail.com>) escribió:
> >
> >> Thanks for the feedback Matthias.
> >>
> >> * 1. You're right. I'll reorder the scenarios.
> >>
> >> * 2. Agree. I'll update the KIP.
> >>
> >> * 3. I like it, updating to `reset-offsets`
> >>
> >> * 4. Agree, removing the `reset-` part
> >>
> >> * 5. Yes, 1.e option without --execute or --export will print out
> current
> >> offset, and the new offset, that will be the same. The use-case of this
> >> option is to use it in combination with --export mostly and have a
> current
> >> 'checkpoint' to reset later. I will add to the KIP how the output should
> >> looks like.
> >>
> >> * 6. Considering 4., I will update it to `--to-offset`
> >>
> >> * 7. I like the idea to unify these options (plus, minus).
> >> `shift-offsets-by` is a good option, but I will like some more feedback
> >> here about the name. I will update the KIP in the meantime.
> >>
> >> * 8. Yes, discussed in 9.
> >>
> >> * 9. Agree. I'll love some feedback here. `topic` is already used by
> >> `delete`, and we can add `--all-topics` to consider all
> topics/partitions
> >> assigned to a group. How could we define specific topics/partitions?
> >>
> >> * 10. Haven't thought about it, but make sense.
> >> ,, would be enough.
> >>
> >> * 11. Agree. Solved with 10.
> >>
> >> Also, I have a couple of changes to mention:
> >>
> >> 1. I have add a reference to the branch where I'm working on this KIP.
> >>
> >> 2. About the period scenario `--to-period`. I will change it to
> >> `--to-duration` given that duration (
> >> https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
> >> follows this format: 'PnDTnHnMnS' and does not consider daylight saving
> >> efects.
> >>
> >>
> >>
> >> El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax (<
> matth...@confluent.io>)
> >> escribió:
> >>
> >> Hi,
> >>
> >> thanks for updating the KIP. Couple of follow up comments:
> >>
> >> * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
> >> time" option -- IMHO it belongs to "reset by position"?
> >>
> >>
> >> * Nit: Description of "Reset to Earliest"
> >>
> >>> using Kafka Consumer's `auto.offset.reset` to `earliest`
> >>
> >> I think this is strictly speaking not correct (as auto.offset.reset only
> >> triggered if no valid offset is found, but this tool explicitly modified
> >> committed offset), and should be phrased as
> >>
> >>> using Kafka Consumer's #seekToBeginning()
> >>
> >> -> similar issue for description of "Reset to Latest"
> >>
> >>
> >> * Main option: rename to --reset-offsets (plural instead of singular)
> >>
> >>
> >> * Scenario Options: I would remove "reset" from all options, because the
> >> main argument "--reset-offset" says already what to do:
> >>
> >>> bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX
> >>
> >> better (IMHO):
> >>
> >>> bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX
> >>
> >>
> >>
> >> * Option 1.e ("print and export current offset") is not intuitive to use
> >> IMHO. The main option is "--reset-offset" but nothing happens if no
> >> scenario is specified. It is also not specified, what the output should
> >> look like?
> >>
> >> Furthermore, --describe should actually show currently committed offset
> >> for a group. So it seems to be redundant to have the same option in
> >> --reset-of

[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880971#comment-15880971
 ] 

Matthias J. Sax commented on KAFKA-4791:


\cc [~damianguy]

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880966#comment-15880966
 ] 

Matthias J. Sax commented on KAFKA-4791:


You should not call {{connectStateStoreNameToSourceTopics}}. It's an leaking 
internal API that will be removed. Cf. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API
I would close this as "not a problem". Right now, {{KStreamBuilder}} uses this 
for source {{KTable}} and a {{KTable}} is always read from a single topic, so 
there is not need to all multiple topics or topic patterns. \cc [~guozhang]

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Matthias J. Sax
So you suggest to merge "scope options" --topics, --topic, and
--partitions into a single option? Sound good to me.

I like the compact way to express it, ie, topicname:list-of-partitions
with "all partitions" if not partitions are specified. It's quite
intuitive to use.

Just wondering, if we could get rid of the repeated --topic option; it's
somewhat verbose. Have no good idea though who to improve it.

If you concatenate multiple topic, we need one more character that is
not allowed in topic names to separate the topics:

> invalidChars = {'/', '\\', ',', '\u', ':', '"', '\'', ';', '*',
'?', ' ', '\t', '\r', '\n', '='};

maybe

--topics t1=1,2,3:t2:t3=3

use '=' to specify partitions (instead of ':' as you proposed) and ':'
to separate topics? All other characters seem to be worse to use to me.
But maybe you have a better idea.



-Matthias


On 2/23/17 3:15 AM, Jorge Esteban Quilcate Otoya wrote:
> @Matthias about the point 9:
> 
> What about keeping only the --topic option, and support this format:
> 
> `--topic t1:0,1,2 --topic t2 --topic t3:2`
> 
> In this case topics t1, t2, and t3 will be selected: topic t1 with
> partitions 0,1 and 2; topic t2 with all its partitions; and topic t3, with
> only partition 2.
> 
> Jorge.
> 
> El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
> quilcate.jo...@gmail.com>) escribió:
> 
>> Thanks for the feedback Matthias.
>>
>> * 1. You're right. I'll reorder the scenarios.
>>
>> * 2. Agree. I'll update the KIP.
>>
>> * 3. I like it, updating to `reset-offsets`
>>
>> * 4. Agree, removing the `reset-` part
>>
>> * 5. Yes, 1.e option without --execute or --export will print out current
>> offset, and the new offset, that will be the same. The use-case of this
>> option is to use it in combination with --export mostly and have a current
>> 'checkpoint' to reset later. I will add to the KIP how the output should
>> looks like.
>>
>> * 6. Considering 4., I will update it to `--to-offset`
>>
>> * 7. I like the idea to unify these options (plus, minus).
>> `shift-offsets-by` is a good option, but I will like some more feedback
>> here about the name. I will update the KIP in the meantime.
>>
>> * 8. Yes, discussed in 9.
>>
>> * 9. Agree. I'll love some feedback here. `topic` is already used by
>> `delete`, and we can add `--all-topics` to consider all topics/partitions
>> assigned to a group. How could we define specific topics/partitions?
>>
>> * 10. Haven't thought about it, but make sense.
>> ,, would be enough.
>>
>> * 11. Agree. Solved with 10.
>>
>> Also, I have a couple of changes to mention:
>>
>> 1. I have add a reference to the branch where I'm working on this KIP.
>>
>> 2. About the period scenario `--to-period`. I will change it to
>> `--to-duration` given that duration (
>> https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
>> follows this format: 'PnDTnHnMnS' and does not consider daylight saving
>> efects.
>>
>>
>>
>> El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax ()
>> escribió:
>>
>> Hi,
>>
>> thanks for updating the KIP. Couple of follow up comments:
>>
>> * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
>> time" option -- IMHO it belongs to "reset by position"?
>>
>>
>> * Nit: Description of "Reset to Earliest"
>>
>>> using Kafka Consumer's `auto.offset.reset` to `earliest`
>>
>> I think this is strictly speaking not correct (as auto.offset.reset only
>> triggered if no valid offset is found, but this tool explicitly modified
>> committed offset), and should be phrased as
>>
>>> using Kafka Consumer's #seekToBeginning()
>>
>> -> similar issue for description of "Reset to Latest"
>>
>>
>> * Main option: rename to --reset-offsets (plural instead of singular)
>>
>>
>> * Scenario Options: I would remove "reset" from all options, because the
>> main argument "--reset-offset" says already what to do:
>>
>>> bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX
>>
>> better (IMHO):
>>
>>> bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX
>>
>>
>>
>> * Option 1.e ("print and export current offset") is not intuitive to use
>> IMHO. The main option is "--reset-offset" but nothing happens if no
>> scenario is specified. It is also not specified, what the output should
>> look like?
>>
>> Furthermore, --describe should actually show currently committed offset
>> for a group. So it seems to be redundant to have the same option in
>> --reset-offsets
>>
>>
>> * Option 2.a: I would rename to "--reset-to-offset" (or considering the
>> comment above to "--to-offset")
>>
>>
>> * Option 2.b and 2.c: I would unify to "--shift-offsets-by" (or similar)
>> and accept positive/negative values
>>
>>
>> * About Scope "all": maybe it's better to have an option "--all-topics"
>> (or similar). IMHO explicit arguments are preferable over implicit
>> setting to guard again accidental miss use of the tool.
>>
>>
>> * Scope: I also think, that "--topic" (singular) and "--topics" (plural)
>> are too similar

[GitHub] kafka pull request #2583: MINOR: Reduce metrics overheads

2017-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2583


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Jun Rao
Hi, Ismael,

For #3, typically, an admin won't configure more io threads than CPU cores,
but it's possible for an admin to start with fewer io threads than cores
and grow that later on.

Hi, Dong,

I think the throttleTime sensor on the broker tells the admin whether a
user/clentId is throttled or not.

Hi, Radi,

The reasoning for delaying the throttled requests on the broker instead of
returning an error immediately is that the latter has no way to prevent the
client from retrying immediately, which will make things worse. The
delaying logic is based off a delay queue. A separate expiration thread
just waits on the next to be expired request. So, it doesn't tie up a
request handler thread.

Thanks,

Jun

On Thu, Feb 23, 2017 at 9:07 AM, Ismael Juma  wrote:

> Hi Jay,
>
> Regarding 1, I definitely like the simplicity of keeping a single throttle
> time field in the response. The downside is that the client metrics will be
> more coarse grained.
>
> Regarding 3, we have `leader.imbalance.per.broker.percentage` and
> `log.cleaner.min.cleanable.ratio`.
>
> Ismael
>
> On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:
>
> > A few minor comments:
> >
> >1. Isn't it the case that the throttling time response field should
> have
> >the total time your request was throttled irrespective of the quotas
> > that
> >caused that. Limiting it to byte rate quota doesn't make sense, but I
> > also
> >I don't think we want to end up adding new fields in the response for
> > every
> >single thing we quota, right?
> >2. I don't think we should make this quota specifically about io
> >threads. Once we introduce these quotas people set them and expect
> them
> > to
> >be enforced (and if they aren't it may cause an outage). As a result
> > they
> >are a bit more sensitive than normal configs, I think. The current
> > thread
> >pools seem like something of an implementation detail and not the
> level
> > the
> >user-facing quotas should be involved with. I think it might be better
> > to
> >make this a general request-time throttle with no mention in the
> naming
> >about I/O threads and simply acknowledge the current limitation (which
> > we
> >may someday fix) in the docs that this covers only the time after the
> >thread is read off the network.
> >3. As such I think the right interface to the user would be something
> >like percent_request_time and be in {0,...100} or request_time_ratio
> > and be
> >in {0.0,...,1.0} (I think "ratio" is the terminology we used if the
> > scale
> >is between 0 and 1 in the other metrics, right?)
> >
> > -Jay
> >
> > On Thu, Feb 23, 2017 at 3:45 AM, Rajini Sivaram  >
> > wrote:
> >
> > > Guozhang/Dong,
> > >
> > > Thank you for the feedback.
> > >
> > > Guozhang : I have updated the section on co-existence of byte rate and
> > > request time quotas.
> > >
> > > Dong: I hadn't added much detail to the metrics and sensors since they
> > are
> > > going to be very similar to the existing metrics and sensors. To avoid
> > > confusion, I have now added more detail. All metrics are in the group
> > > "quotaType" and all sensors have names starting with "quotaType" (where
> > > quotaType is Produce/Fetch/LeaderReplication/
> > > FollowerReplication/*IOThread*).
> > > So there will be no reuse of existing metrics/sensors. The new ones for
> > > request processing time based throttling will be completely independent
> > of
> > > existing metrics/sensors, but will be consistent in format.
> > >
> > > The existing throttle_time_ms field in produce/fetch responses will not
> > be
> > > impacted by this KIP. That will continue to return byte-rate based
> > > throttling times. In addition, a new field request_throttle_time_ms
> will
> > be
> > > added to return request quota based throttling times. These will be
> > exposed
> > > as new metrics on the client-side.
> > >
> > > Since all metrics and sensors are different for each type of quota, I
> > > believe there is already sufficient metrics to monitor throttling on
> both
> > > client and broker side for each type of throttling.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Thu, Feb 23, 2017 at 4:32 AM, Dong Lin  wrote:
> > >
> > > > Hey Rajini,
> > > >
> > > > I think it makes a lot of sense to use io_thread_units as metric to
> > quota
> > > > user's traffic here. LGTM overall. I have some questions regarding
> > > sensors.
> > > >
> > > > - Can you be more specific in the KIP what sensors will be added? For
> > > > example, it will be useful to specify the name and attributes of
> these
> > > new
> > > > sensors.
> > > >
> > > > - We currently have throttle-time and queue-size for byte-rate based
> > > quota.
> > > > Are you going to have separate throttle-time and queue-size for
> > requests
> > > > throttled by io_thread_unit-based quota, or will they share the same
> > > > sensor?
> > > >
> > > > - Does the throttle-time in the ProduceResponse and FetchResp

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Ismael Juma
Hi Jay,

Regarding 1, I definitely like the simplicity of keeping a single throttle
time field in the response. The downside is that the client metrics will be
more coarse grained.

Regarding 3, we have `leader.imbalance.per.broker.percentage` and
`log.cleaner.min.cleanable.ratio`.

Ismael

On Thu, Feb 23, 2017 at 4:43 PM, Jay Kreps  wrote:

> A few minor comments:
>
>1. Isn't it the case that the throttling time response field should have
>the total time your request was throttled irrespective of the quotas
> that
>caused that. Limiting it to byte rate quota doesn't make sense, but I
> also
>I don't think we want to end up adding new fields in the response for
> every
>single thing we quota, right?
>2. I don't think we should make this quota specifically about io
>threads. Once we introduce these quotas people set them and expect them
> to
>be enforced (and if they aren't it may cause an outage). As a result
> they
>are a bit more sensitive than normal configs, I think. The current
> thread
>pools seem like something of an implementation detail and not the level
> the
>user-facing quotas should be involved with. I think it might be better
> to
>make this a general request-time throttle with no mention in the naming
>about I/O threads and simply acknowledge the current limitation (which
> we
>may someday fix) in the docs that this covers only the time after the
>thread is read off the network.
>3. As such I think the right interface to the user would be something
>like percent_request_time and be in {0,...100} or request_time_ratio
> and be
>in {0.0,...,1.0} (I think "ratio" is the terminology we used if the
> scale
>is between 0 and 1 in the other metrics, right?)
>
> -Jay
>
> On Thu, Feb 23, 2017 at 3:45 AM, Rajini Sivaram 
> wrote:
>
> > Guozhang/Dong,
> >
> > Thank you for the feedback.
> >
> > Guozhang : I have updated the section on co-existence of byte rate and
> > request time quotas.
> >
> > Dong: I hadn't added much detail to the metrics and sensors since they
> are
> > going to be very similar to the existing metrics and sensors. To avoid
> > confusion, I have now added more detail. All metrics are in the group
> > "quotaType" and all sensors have names starting with "quotaType" (where
> > quotaType is Produce/Fetch/LeaderReplication/
> > FollowerReplication/*IOThread*).
> > So there will be no reuse of existing metrics/sensors. The new ones for
> > request processing time based throttling will be completely independent
> of
> > existing metrics/sensors, but will be consistent in format.
> >
> > The existing throttle_time_ms field in produce/fetch responses will not
> be
> > impacted by this KIP. That will continue to return byte-rate based
> > throttling times. In addition, a new field request_throttle_time_ms will
> be
> > added to return request quota based throttling times. These will be
> exposed
> > as new metrics on the client-side.
> >
> > Since all metrics and sensors are different for each type of quota, I
> > believe there is already sufficient metrics to monitor throttling on both
> > client and broker side for each type of throttling.
> >
> > Regards,
> >
> > Rajini
> >
> >
> > On Thu, Feb 23, 2017 at 4:32 AM, Dong Lin  wrote:
> >
> > > Hey Rajini,
> > >
> > > I think it makes a lot of sense to use io_thread_units as metric to
> quota
> > > user's traffic here. LGTM overall. I have some questions regarding
> > sensors.
> > >
> > > - Can you be more specific in the KIP what sensors will be added? For
> > > example, it will be useful to specify the name and attributes of these
> > new
> > > sensors.
> > >
> > > - We currently have throttle-time and queue-size for byte-rate based
> > quota.
> > > Are you going to have separate throttle-time and queue-size for
> requests
> > > throttled by io_thread_unit-based quota, or will they share the same
> > > sensor?
> > >
> > > - Does the throttle-time in the ProduceResponse and FetchResponse
> > contains
> > > time due to io_thread_unit-based quota?
> > >
> > > - Currently kafka server doesn't not provide any log or metrics that
> > tells
> > > whether any given clientId (or user) is throttled. This is not too bad
> > > because we can still check the client-side byte-rate metric to validate
> > > whether a given client is throttled. But with this io_thread_unit,
> there
> > > will be no way to validate whether a given client is slow because it
> has
> > > exceeded its io_thread_unit limit. It is necessary for user to be able
> to
> > > know this information to figure how whether they have reached there
> quota
> > > limit. How about we add log4j log on the server side to periodically
> > print
> > > the (client_id, byte-rate-throttle-time, io-thread-unit-throttle-time)
> so
> > > that kafka administrator can figure those users that have reached their
> > > limit and act accordingly?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Feb 22, 2017

[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-02-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880817#comment-15880817
 ] 

Jun Rao commented on KAFKA-2729:


[~prasincs], if the controller is partitioned off other brokers and ZK, the 
expected flow is the following: (1) ZK server detects that the old controller's 
session expires; (2) the controller path is removed by ZK; (3) a new controller 
is elected and changes leaders/isrs; (4) network is back on the old controller; 
(5) old controller receives ZK session expiration event; (6) old controller 
stops doing the controller stuff and resign. Note that the old controller 
doesn't really know that it's no longer the controller until step (5). The gap 
we have now is that step (6) is not done in a timely fashion.

Are you deploying Kafka in the same data center? What kind of network 
partitions are you seeing? Typically, we expect network partitions are rare 
within the same data center. If there are short network glitches, one temporary 
fix is to increase the ZK session timeout to accommodate for that until the 
network issue is fixed.

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Jay Kreps
A few minor comments:

   1. Isn't it the case that the throttling time response field should have
   the total time your request was throttled irrespective of the quotas that
   caused that. Limiting it to byte rate quota doesn't make sense, but I also
   I don't think we want to end up adding new fields in the response for every
   single thing we quota, right?
   2. I don't think we should make this quota specifically about io
   threads. Once we introduce these quotas people set them and expect them to
   be enforced (and if they aren't it may cause an outage). As a result they
   are a bit more sensitive than normal configs, I think. The current thread
   pools seem like something of an implementation detail and not the level the
   user-facing quotas should be involved with. I think it might be better to
   make this a general request-time throttle with no mention in the naming
   about I/O threads and simply acknowledge the current limitation (which we
   may someday fix) in the docs that this covers only the time after the
   thread is read off the network.
   3. As such I think the right interface to the user would be something
   like percent_request_time and be in {0,...100} or request_time_ratio and be
   in {0.0,...,1.0} (I think "ratio" is the terminology we used if the scale
   is between 0 and 1 in the other metrics, right?)

-Jay

On Thu, Feb 23, 2017 at 3:45 AM, Rajini Sivaram 
wrote:

> Guozhang/Dong,
>
> Thank you for the feedback.
>
> Guozhang : I have updated the section on co-existence of byte rate and
> request time quotas.
>
> Dong: I hadn't added much detail to the metrics and sensors since they are
> going to be very similar to the existing metrics and sensors. To avoid
> confusion, I have now added more detail. All metrics are in the group
> "quotaType" and all sensors have names starting with "quotaType" (where
> quotaType is Produce/Fetch/LeaderReplication/
> FollowerReplication/*IOThread*).
> So there will be no reuse of existing metrics/sensors. The new ones for
> request processing time based throttling will be completely independent of
> existing metrics/sensors, but will be consistent in format.
>
> The existing throttle_time_ms field in produce/fetch responses will not be
> impacted by this KIP. That will continue to return byte-rate based
> throttling times. In addition, a new field request_throttle_time_ms will be
> added to return request quota based throttling times. These will be exposed
> as new metrics on the client-side.
>
> Since all metrics and sensors are different for each type of quota, I
> believe there is already sufficient metrics to monitor throttling on both
> client and broker side for each type of throttling.
>
> Regards,
>
> Rajini
>
>
> On Thu, Feb 23, 2017 at 4:32 AM, Dong Lin  wrote:
>
> > Hey Rajini,
> >
> > I think it makes a lot of sense to use io_thread_units as metric to quota
> > user's traffic here. LGTM overall. I have some questions regarding
> sensors.
> >
> > - Can you be more specific in the KIP what sensors will be added? For
> > example, it will be useful to specify the name and attributes of these
> new
> > sensors.
> >
> > - We currently have throttle-time and queue-size for byte-rate based
> quota.
> > Are you going to have separate throttle-time and queue-size for requests
> > throttled by io_thread_unit-based quota, or will they share the same
> > sensor?
> >
> > - Does the throttle-time in the ProduceResponse and FetchResponse
> contains
> > time due to io_thread_unit-based quota?
> >
> > - Currently kafka server doesn't not provide any log or metrics that
> tells
> > whether any given clientId (or user) is throttled. This is not too bad
> > because we can still check the client-side byte-rate metric to validate
> > whether a given client is throttled. But with this io_thread_unit, there
> > will be no way to validate whether a given client is slow because it has
> > exceeded its io_thread_unit limit. It is necessary for user to be able to
> > know this information to figure how whether they have reached there quota
> > limit. How about we add log4j log on the server side to periodically
> print
> > the (client_id, byte-rate-throttle-time, io-thread-unit-throttle-time) so
> > that kafka administrator can figure those users that have reached their
> > limit and act accordingly?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> >
> > On Wed, Feb 22, 2017 at 4:46 PM, Guozhang Wang 
> wrote:
> >
> > > Made a pass over the doc, overall LGTM except a minor comment on the
> > > throttling implementation:
> > >
> > > Stated as "Request processing time throttling will be applied on top if
> > > necessary." I thought that it meant the request processing time
> > throttling
> > > is applied first, but continue reading I found it actually meant to
> apply
> > > produce / fetch byte rate throttling first.
> > >
> > > Also the last sentence "The remaining delay if any is applied to the
> > > response." is a bit confusing to me. Maybe rewording it a

[jira] [Resolved] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4788.

Resolution: Fixed

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880785#comment-15880785
 ] 

ASF GitHub Bot commented on KAFKA-4788:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2588


> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2588: KAFKA-4788: Revert "KAFKA-4092: retention.bytes sh...

2017-02-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2588


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880782#comment-15880782
 ] 

Bill Bejeck commented on KAFKA-4791:


picking this one up.

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-4791:
--

Assignee: Bill Bejeck

> Kafka Streams - unable to add state stores when using wildcard topics on the 
> source
> ---
>
> Key: KAFKA-4791
> URL: https://issues.apache.org/jira/browse/KAFKA-4791
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
> Environment: Java 8
>Reporter: Bart Vercammen
>Assignee: Bill Bejeck
>
> I'm trying to build up a topology (using TopologyBuilder) with following 
> components :
> {code}
> new TopologyBuilder()
>   .addSource("ingest", Pattern.compile( ... ))
>   .addProcessor("myprocessor", ..., "ingest")
>   .addStateStore(dataStore, "myprocessor")
> {code}
> Somehow this does not seem to work.
> When creating the topology with exact topic names, all works fine, but it 
> seems not possible to attach state stores when using wildcard topics on the 
> sources.
> Inside {{addStateStore}}, the processor gets connected to the state store 
> with {{connectProcessorAndStateStore}}, and there it will try to connect the 
> state store with the source topics from the processor: 
> {{connectStateStoreNameToSourceTopics}}  
> Here lies the problem: 
> {code}
> private Set findSourceTopicsForProcessorParents(String [] 
> parents) {
> final Set sourceTopics = new HashSet<>();
> for (String parent : parents) {
> NodeFactory nodeFactory = nodeFactories.get(parent);
> if (nodeFactory instanceof SourceNodeFactory) {
> sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()));
> } else if (nodeFactory instanceof ProcessorNodeFactory) {
> 
> sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory)
>  nodeFactory).parents));
> }
> }
> return sourceTopics;
> }
> {code}
> The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
> nodeFactory).getTopics()))}} will fail as there are no topics inside the 
> {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)
> I also tried to search for some unit tests inside the Kafka Streams project 
> that cover this scenario, but alas, I was not able to find any.
> Only some tests on state stores with exact topic names, and some tests on 
> wildcard topics, but no combination of both ...



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread radai
i dont think time/cpu% are easy to reason about. most user-facing quota
systems i know (especially the commercial ones) focus on things users
understand better - iops and bytes.

as for quotas and "overhead" requests like heartbeats - on the one hand
subjecting them to the quota may cause clients to die out. on the other not
subjecting them to the quota opens the broker up to DOS attacks. how about
giving overhead requests their own quota, separate from "real"
(user-initiated?) requests? slightly more complicated but i think solves
the issue?

how long are requests held in purgatory? wouldnt this, at some point, still
cause resources to be taken? wouldnt it be better (for high enough delay
values) to just return an error to the client (quota exceeded, try again in
3 seconds)?

how would these work across an entire cluster? if these are enforced
independently on every single broker you'd be hitting "monotonous" clients
(who interact with fewer partitions) much harder than clients who operate
across a lot of partitions.

On Thu, Feb 23, 2017 at 8:02 AM, Ismael Juma  wrote:

> Thanks for the KIP, Rajini. This is a welcome improvement and the KIP page
> covers it well. A few comments:
>
> 1. Can you expand a bit on the motivation for throttling requests that fail
> authorization for ClusterAction? Under what scenarios would this help?
>
> 2. I think we should rename `throttle_time_ms` in the new version of
> produce/fetch response to make it clear that it refers to the byte rate
> throttling. Also, it would be good to include the updated schema for the
> responses (we typically try to do that whenever we update protocol APIs).
>
> 3. I think I am OK with using absolute units, but I am not sure about the
> argument why it's better than a percentage. We are comparing request
> threads to CPUs, but they're not the same as increasing the number of
> request threads doesn't necessarily mean that the server can cope with more
> requests. In the example where we double the number of threads, all the
> existing users would still have the same capacity proportionally speaking
> so it seems intuitive to me. One thing that would be helpful, I think, is
> to describe a few scenarios where the setting needs to be adjusted and how
> users would go about doing it.
>
> 4. I think it's worth mentioning that TLS increases the load on the network
> thread significantly and for cases where there is mixed plaintext and TLS
> traffic, the existing byte rate throttling may not do a great job. I think
> it's OK to tackle this in a separate KIP, but worth mentioning the
> limitation.
>
> 5. We mention DoS attacks in the document. It may be worth mentioning that
> this mostly helps with clients that are not malicious. A malicious client
> could generate a large number of connections to counteract the delays that
> this KIP introduces. Kafka has connection limits per IP today, but not per
> user, so a distributed DoS could bypass those. This is not easy to solve at
> the Kafka level since the authentication step required to get the user may
> be costly enough that the brokers will eventually be overwhelmed.
>
> 6. It's unfortunate that the existing byte rate quota configs use
> underscores instead of dots (like every other config) as separators. It's
> reasonable for `io_thread_units` to use the same convention as the byte
> rate configs, but it's not great that we are adding to the inconsistency. I
> don't have any great solutions apart from perhaps accepting the dot
> notation for all these configs as well.
>
> Ismael
>
> On Fri, Feb 17, 2017 at 5:05 PM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have just created KIP-124 to introduce request rate quotas to Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+
> > Request+rate+quotas
> >
> > The proposal is for a simple percentage request handling time quota that
> > can be allocated to **, ** or **. There
> > are a few other suggestions also under "Rejected alternatives". Feedback
> > and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Ismael Juma
Thanks for the KIP, Rajini. This is a welcome improvement and the KIP page
covers it well. A few comments:

1. Can you expand a bit on the motivation for throttling requests that fail
authorization for ClusterAction? Under what scenarios would this help?

2. I think we should rename `throttle_time_ms` in the new version of
produce/fetch response to make it clear that it refers to the byte rate
throttling. Also, it would be good to include the updated schema for the
responses (we typically try to do that whenever we update protocol APIs).

3. I think I am OK with using absolute units, but I am not sure about the
argument why it's better than a percentage. We are comparing request
threads to CPUs, but they're not the same as increasing the number of
request threads doesn't necessarily mean that the server can cope with more
requests. In the example where we double the number of threads, all the
existing users would still have the same capacity proportionally speaking
so it seems intuitive to me. One thing that would be helpful, I think, is
to describe a few scenarios where the setting needs to be adjusted and how
users would go about doing it.

4. I think it's worth mentioning that TLS increases the load on the network
thread significantly and for cases where there is mixed plaintext and TLS
traffic, the existing byte rate throttling may not do a great job. I think
it's OK to tackle this in a separate KIP, but worth mentioning the
limitation.

5. We mention DoS attacks in the document. It may be worth mentioning that
this mostly helps with clients that are not malicious. A malicious client
could generate a large number of connections to counteract the delays that
this KIP introduces. Kafka has connection limits per IP today, but not per
user, so a distributed DoS could bypass those. This is not easy to solve at
the Kafka level since the authentication step required to get the user may
be costly enough that the brokers will eventually be overwhelmed.

6. It's unfortunate that the existing byte rate quota configs use
underscores instead of dots (like every other config) as separators. It's
reasonable for `io_thread_units` to use the same convention as the byte
rate configs, but it's not great that we are adding to the inconsistency. I
don't have any great solutions apart from perhaps accepting the dot
notation for all these configs as well.

Ismael

On Fri, Feb 17, 2017 at 5:05 PM, Rajini Sivaram 
wrote:

> Hi all,
>
> I have just created KIP-124 to introduce request rate quotas to Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+
> Request+rate+quotas
>
> The proposal is for a simple percentage request handling time quota that
> can be allocated to **, ** or **. There
> are a few other suggestions also under "Rejected alternatives". Feedback
> and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>


[jira] [Created] (KAFKA-4791) Kafka Streams - unable to add state stores when using wildcard topics on the source

2017-02-23 Thread Bart Vercammen (JIRA)
Bart Vercammen created KAFKA-4791:
-

 Summary: Kafka Streams - unable to add state stores when using 
wildcard topics on the source
 Key: KAFKA-4791
 URL: https://issues.apache.org/jira/browse/KAFKA-4791
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
 Environment: Java 8
Reporter: Bart Vercammen


I'm trying to build up a topology (using TopologyBuilder) with following 
components :
{code}
new TopologyBuilder()
  .addSource("ingest", Pattern.compile( ... ))
  .addProcessor("myprocessor", ..., "ingest")
  .addStateStore(dataStore, "myprocessor")
{code}

Somehow this does not seem to work.
When creating the topology with exact topic names, all works fine, but it seems 
not possible to attach state stores when using wildcard topics on the sources.

Inside {{addStateStore}}, the processor gets connected to the state store with 
{{connectProcessorAndStateStore}}, and there it will try to connect the state 
store with the source topics from the processor: 
{{connectStateStoreNameToSourceTopics}}  
Here lies the problem: 
{code}
private Set findSourceTopicsForProcessorParents(String [] parents) {
final Set sourceTopics = new HashSet<>();
for (String parent : parents) {
NodeFactory nodeFactory = nodeFactories.get(parent);
if (nodeFactory instanceof SourceNodeFactory) {
sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
nodeFactory).getTopics()));
} else if (nodeFactory instanceof ProcessorNodeFactory) {

sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) 
nodeFactory).parents));
}
}
return sourceTopics;
}
{code}

The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) 
nodeFactory).getTopics()))}} will fail as there are no topics inside the 
{{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null)

I also tried to search for some unit tests inside the Kafka Streams project 
that cover this scenario, but alas, I was not able to find any.
Only some tests on state stores with exact topic names, and some tests on 
wildcard topics, but no combination of both ...





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4779) Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py

2017-02-23 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880684#comment-15880684
 ] 

Rajini Sivaram commented on KAFKA-4779:
---

I couldn't recreate the failure, but the code for phase_two of security upgrade 
with different client_protocol and broker_protocol is currently a disruptive 
upgrade that stops produce and consume during the upgrade. As a result, 
consumer can timeout if the upgrade takes slightly longer than expected.

Non-disruptive upgrade of cluster to enable new security protocols is described 
in the docs (http://kafka.apache.org/documentation/#security_rolling_upgrade). 
The new protocols must be enabled first with incremental bounce. And then the 
inter-broker protocol is updated with incremental bounce. And finally, the old 
protocol is removed. When client_protocol (SASL_PLAINTEXT) and broker_protocol 
(SSL) are being updated to different protocols starting with PLAINTEXT, both 
SASL_PLAINTEXT and SSL must be enabled first before inter-broker protocol is 
changed to SSL. The test was enabling only SASL_PLAINTEXT. As a result 
inter-broker communication was broken during the upgrade, causing produce and 
consume to fail until the cluster got back to a good state. Since the purpose 
of the test is to verify non-disruptive upgrade, I have changed the test to 
enable both SASL_PLAINTEXT and SSL first so that the upgrade is performed 
without disrupting producers or consumers.

> Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
> 
>
> Key: KAFKA-4779
> URL: https://issues.apache.org/jira/browse/KAFKA-4779
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Rajini Sivaram
>
> This test failed on 01/29, on both trunk and 0.10.2, error message:
> {noformat}
> The consumer has terminated, or timed out, on node ubuntu@worker3.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 100, in run_produce_consume_validate
> self.stop_producer_and_consumer()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 87, in stop_producer_and_consumer
> self.check_alive()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in check_alive
> raise Exception(msg)
> Exception: The consumer has terminated, or timed out, on node ubuntu@worker3.
> {noformat}
> Looks like the console consumer times out: 
> {noformat}
> [2017-01-30 04:56:00,972] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> kafka.consumer.ConsumerTimeoutException
> at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:90)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {noformat}
> A bunch of these security_rolling_upgrade tests failed, and in all cases, the 
> producer produced ~15k messages, of which ~7k were acked, and the consumer 
> only got around ~2600 before timing out. 
> There are a lot of messages like the following for different request types on 
> the producer and consumer:
> {noformat}
> [2017-01-30 05:13:35,954] WARN Received unknown topic or partition error in 
> produce request on partition test_topic-0. The topic/partition may not exist 
> or the user may not have Describe access to it 
> (org.apache.kafka.clients.producer.internals.Sende

[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880641#comment-15880641
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

GitHub user hrafzali opened a pull request:

https://github.com/apache/kafka/pull/2590

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
timestamps to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the 
extracted timestamp is not forwarded with the produced record to the internal 
topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang @dguy


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hrafzali/kafka 
KAFKA-4789_ProcessorTopologyTestDriver_timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2590


commit 579a65a691dd9d07e4bf945badfe18c261da8540
Author: Hamidreza Afzali 
Date:   2017-02-23T10:53:18Z

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
extracted timestamps to internal topics




> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2589: KAFKA-4779: Fix security upgrade system test to be...

2017-02-23 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2589

KAFKA-4779: Fix security upgrade system test to be non-disruptive

The phase_two security upgrade test verifies upgrading inter-broker and 
client protocols to the same value as well as different values. The second case 
currently changes inter-broker protocol without first enabling the protocol, 
disrupting produce/consume until the whole cluster is updated. This commit 
changes the test to be a non-disruptive upgrade test that enables protocols 
first (simulating phase one of upgrade).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4779

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2589.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2589


commit 71aa4c4fa8035695ac460797b84a6754576dc99f
Author: Rajini Sivaram 
Date:   2017-02-23T14:57:45Z

KAFKA-4779: Fix security upgrade system test to be non-disruptive




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4779) Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880642#comment-15880642
 ] 

ASF GitHub Bot commented on KAFKA-4779:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2589

KAFKA-4779: Fix security upgrade system test to be non-disruptive

The phase_two security upgrade test verifies upgrading inter-broker and 
client protocols to the same value as well as different values. The second case 
currently changes inter-broker protocol without first enabling the protocol, 
disrupting produce/consume until the whole cluster is updated. This commit 
changes the test to be a non-disruptive upgrade test that enables protocols 
first (simulating phase one of upgrade).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4779

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2589.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2589


commit 71aa4c4fa8035695ac460797b84a6754576dc99f
Author: Rajini Sivaram 
Date:   2017-02-23T14:57:45Z

KAFKA-4779: Fix security upgrade system test to be non-disruptive




> Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
> 
>
> Key: KAFKA-4779
> URL: https://issues.apache.org/jira/browse/KAFKA-4779
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Rajini Sivaram
>
> This test failed on 01/29, on both trunk and 0.10.2, error message:
> {noformat}
> The consumer has terminated, or timed out, on node ubuntu@worker3.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 100, in run_produce_consume_validate
> self.stop_producer_and_consumer()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 87, in stop_producer_and_consumer
> self.check_alive()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in check_alive
> raise Exception(msg)
> Exception: The consumer has terminated, or timed out, on node ubuntu@worker3.
> {noformat}
> Looks like the console consumer times out: 
> {noformat}
> [2017-01-30 04:56:00,972] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> kafka.consumer.ConsumerTimeoutException
> at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:90)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {noformat}
> A bunch of these security_rolling_upgrade tests failed, and in all cases, the 
> producer produced ~15k messages, of which ~7k were acked, and the consumer 
> only got around ~2600 before timing out. 
> There are a lot of messages like the following for different request types on 
> the producer and consumer:
> {noformat}
> [2017-01-30 05:13:35,954] WARN Received unknown topic or partition error in 
> produce request on partition test_topic-0. The topic/partition may not exist 
> or the user may not have Describe access to it 
> (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2590: KAFKA-4789: Added support to ProcessorTopologyTest...

2017-02-23 Thread hrafzali
GitHub user hrafzali opened a pull request:

https://github.com/apache/kafka/pull/2590

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
timestamps to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the 
extracted timestamp is not forwarded with the produced record to the internal 
topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang @dguy


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hrafzali/kafka 
KAFKA-4789_ProcessorTopologyTestDriver_timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2590.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2590


commit 579a65a691dd9d07e4bf945badfe18c261da8540
Author: Hamidreza Afzali 
Date:   2017-02-23T10:53:18Z

KAFKA-4789: Added support to ProcessorTopologyTestDriver to forward 
extracted timestamps to internal topics




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880637#comment-15880637
 ] 

ASF GitHub Bot commented on KAFKA-4789:
---

Github user hrafzali closed the pull request at:

https://github.com/apache/kafka/pull/2587


> ProcessorTopologyTestDriver does not forward extracted timestamps to internal 
> topics
> 
>
> Key: KAFKA-4789
> URL: https://issues.apache.org/jira/browse/KAFKA-4789
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Hamidreza Afzali
>  Labels: unit-test
>
> *Problem:*
> When using ProcessorTopologyTestDriver, the extracted timestamp is not 
> forwarded with the produced record to the internal topics.
> *Example:*
> {code}
> object Topology1 {
>   def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val outputTopic = "output"
> val stateStore = "count"
> val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
> 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
> classOf[MyTimestampExtractor].getName)
> val windowedStringSerde = Serdes.serdeFrom(new 
> WindowedSerializer(Serdes.String.serializer),
>   new WindowedDeserializer(Serdes.String.deserializer))
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>   .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
>   .groupByKey(Serdes.String, Serdes.Integer)
>   .count(TimeWindows.of(1000L), stateStore)
>   .to(windowedStringSerde, Serdes.Long, outputTopic)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
> builder, stateStore)
> inputs.foreach {
>   case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer, 
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic, 
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
>   }
> }
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2587: KAFKA-4789: Added support to ProcessorTopologyTest...

2017-02-23 Thread hrafzali
Github user hrafzali closed the pull request at:

https://github.com/apache/kafka/pull/2587


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4790) Kafka can't not recover after a disk full

2017-02-23 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880567#comment-15880567
 ] 

Pengwei commented on KAFKA-4790:


The index max file set to 1MB
log.index.size.max.bytes=1024000


The reason I found is below:

1. Producer batch a lot of message into kafka(for example 512k) , so every 
write will have more than 4k(index write interval ),  for example write 2050 
index entry.  


2. At the same time disk is full, the kafka is dead before recover point is 
flush into disk

3. Restart the kafka,  the recover function, will check every msgs to re-append 
the index item into the index file, then every 4k message will write a index 
entry, then the total index entry will exceed 2050 or maybe exceed the index 
full's max entries

> Kafka can't not recover after a disk full
> -
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
> Fix For: 0.10.2.1
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880453#comment-15880453
 ] 

Ismael Juma commented on KAFKA-4788:


[~omkreddy], I think KAFKA-4092 is valuable, but the implementation needs to be 
revisited. I submitted a PR to revert the change for now.

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4779) Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py

2017-02-23 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-4779:
-

Assignee: Rajini Sivaram

> Failure in kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
> 
>
> Key: KAFKA-4779
> URL: https://issues.apache.org/jira/browse/KAFKA-4779
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Rajini Sivaram
>
> This test failed on 01/29, on both trunk and 0.10.2, error message:
> {noformat}
> The consumer has terminated, or timed out, on node ubuntu@worker3.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/core/security_rolling_upgrade_test.py",
>  line 148, in test_rolling_upgrade_phase_two
> self.run_produce_consume_validate(self.roll_in_secured_settings, 
> client_protocol, broker_protocol)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 100, in run_produce_consume_validate
> self.stop_producer_and_consumer()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 87, in stop_producer_and_consumer
> self.check_alive()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.2/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in check_alive
> raise Exception(msg)
> Exception: The consumer has terminated, or timed out, on node ubuntu@worker3.
> {noformat}
> Looks like the console consumer times out: 
> {noformat}
> [2017-01-30 04:56:00,972] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> kafka.consumer.ConsumerTimeoutException
> at kafka.consumer.NewShinyConsumer.receive(BaseConsumer.scala:90)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:120)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {noformat}
> A bunch of these security_rolling_upgrade tests failed, and in all cases, the 
> producer produced ~15k messages, of which ~7k were acked, and the consumer 
> only got around ~2600 before timing out. 
> There are a lot of messages like the following for different request types on 
> the producer and consumer:
> {noformat}
> [2017-01-30 05:13:35,954] WARN Received unknown topic or partition error in 
> produce request on partition test_topic-0. The topic/partition may not exist 
> or the user may not have Describe access to it 
> (org.apache.kafka.clients.producer.internals.Sender)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2588: KAFKA-4788: Revert "KAFKA-4092: retention.bytes sh...

2017-02-23 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/2588

KAFKA-4788: Revert "KAFKA-4092: retention.bytes should not be allowed to be 
less than segment.bytes"

The intent is good, but it needs to take into account broker configs as 
well.
See KAFKA-4788 for more details.

This reverts commit 4ca5abe8ee7578f602fb7653cb8a09640607ea85.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka kafka-4788

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2588


commit 56c197a904dbf7549534f6589e062c61e17e47b5
Author: Ismael Juma 
Date:   2017-02-23T13:19:06Z

Revert "KAFKA-4092: retention.bytes should not be allowed to be less than 
segment.bytes"

The intent is good, but it needs to take into account broker configs as 
well.
See KAFKA-4788 for more details.

This reverts commit 4ca5abe8ee7578f602fb7653cb8a09640607ea85.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880409#comment-15880409
 ] 

ASF GitHub Bot commented on KAFKA-4788:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/2588

KAFKA-4788: Revert "KAFKA-4092: retention.bytes should not be allowed to be 
less than segment.bytes"

The intent is good, but it needs to take into account broker configs as 
well.
See KAFKA-4788 for more details.

This reverts commit 4ca5abe8ee7578f602fb7653cb8a09640607ea85.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka kafka-4788

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2588


commit 56c197a904dbf7549534f6589e062c61e17e47b5
Author: Ismael Juma 
Date:   2017-02-23T13:19:06Z

Revert "KAFKA-4092: retention.bytes should not be allowed to be less than 
segment.bytes"

The intent is good, but it needs to take into account broker configs as 
well.
See KAFKA-4788 for more details.

This reverts commit 4ca5abe8ee7578f602fb7653cb8a09640607ea85.




> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Reopened] (KAFKA-4092) retention.bytes should not be allowed to be less than segment.bytes

2017-02-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-4092:


> retention.bytes should not be allowed to be less than segment.bytes
> ---
>
> Key: KAFKA-4092
> URL: https://issues.apache.org/jira/browse/KAFKA-4092
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Dustin Cote
>Assignee: Dustin Cote
>Priority: Minor
> Fix For: 0.10.2.0
>
>
> Right now retention.bytes can be as small as the user wants but it doesn't 
> really get acted on for the active segment if retention.bytes is smaller than 
> segment.bytes.  We shouldn't allow retention.bytes to be less than 
> segment.bytes and validate that at startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4790) Kafka can't not recover after a disk full

2017-02-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4790:
---
Labels: reliability  (was: )

> Kafka can't not recover after a disk full
> -
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1
>Reporter: Pengwei
>  Labels: reliability
> Fix For: 0.10.2.1
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880385#comment-15880385
 ] 

Manikumar edited comment on KAFKA-4788 at 2/23/17 1:03 PM:
---

[~ijuma]  While using kafka-topics.sh, we are doing topic-level config 
validation on the client (TopicCommand) side, not at broker side. So we don't 
have access to broker configs and KAFKA-4092 validation won't work. This 
validation can be added to Create Topic/Admin Request request code path.

Also, KAFKA-4092 change may not be required. it's just config error, not a bug. 
We don't have this check for regular broker configs (without topic-level 
configs). Maybe we can revert the patch. 


was (Author: omkreddy):
[~ijuma]  While using kafka-topics.sh, we are doing topic-level config 
validation on the client (TopicCommand) side, not at broker side. So we don't 
have access to broker configs and KAFKA-4092 validation does not work. This 
validation can be added to Create Topic/Admin Request request code path.

Also, KAFKA-4092 change may not be required. it's just config error, not a bug. 
We don't have this check for default broker configs (without topic-level 
configs). Maybe we can revert the patch. 

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880385#comment-15880385
 ] 

Manikumar commented on KAFKA-4788:
--

[~ijuma]  While using kafka-topics.sh, we are doing topic-level config 
validation on the client (TopicCommand) side, not at broker side. So we don't 
have access to broker configs and KAFKA-4092 validation does not work. This 
validation can be added to Create Topic/Admin Request request code path.

Also, KAFKA-4092 change may not be required. it's just config error, not a bug. 
We don't have this check for default broker configs (without topic-level 
configs). Maybe we can revert the patch. 

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4790) Kafka can't not recover after a disk full

2017-02-23 Thread Pengwei (JIRA)
Pengwei created KAFKA-4790:
--

 Summary: Kafka can't not recover after a disk full
 Key: KAFKA-4790
 URL: https://issues.apache.org/jira/browse/KAFKA-4790
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.1, 0.9.0.1
Reporter: Pengwei
 Fix For: 0.10.2.1


[2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
[2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
(kafka.log.Log)
[2017-02-23 18:43:59,297] ERROR There was an error in one of the threads during 
logs loading: java.lang.IllegalArgumentException: requirement failed: Attempt 
to append to a full index (size = 128000). (kafka.log.LogManager)
[2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. Prepare 
to shutdown (kafka.server.KafkaServer)
java.lang.IllegalArgumentException: requirement failed: Attempt to append to a 
full index (size = 128000).
at scala.Predef$.require(Predef.scala:219)
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
at kafka.log.LogSegment.recover(LogSegment.scala:191)
at kafka.log.Log.recoverLog(Log.scala:259)
at kafka.log.Log.loadSegments(Log.scala:234)
at kafka.log.Log.(Log.scala:92)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer

2017-02-23 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880367#comment-15880367
 ] 

Armin Braun commented on KAFKA-1895:


I think the way the threading works is somewhat separate from this issue, at 
least with the current state of affairs. Though a clean solution here enables 
having a clean separate I/O thread, would not require one for now though (at 
least I think we need a solution here that will work in the current framework 
of how threading is handled, otherwise this turns into some endless discussion 
on an unrealistically, at least in one step, large rewrite).

Right now the problem isn't exclusively the fact that there is no access to the 
raw responses, the issue is also very much in the way that raw data is handled 
internally by constantly reallocating ByteBuffer etc. too.
Implementing logic that is able to read messages of varying size in reused 
buffers and also able to provide stable iteration over the read result is what 
I would see as the main takeaway here, which then in turn would enable 
providing the raw data with low (no :)?) GC footprint to the user.

> Investigate moving deserialization and decompression out of KafkaConsumer
> -
>
> Key: KAFKA-1895
> URL: https://issues.apache.org/jira/browse/KAFKA-1895
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as 
> commit() etc which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed 
> serialized MemoryRecords chunks and do the deserialization during iteration. 
> This way you could scale this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-23 Thread radai
append-only would mean that if (for whatever reason) i want to replace a
header or strip it out i'd need to copy the whole record.



On Wed, Feb 22, 2017 at 5:09 PM, Michael Pearce 
wrote:

> Im happy to compromise to keep it mutable but move to an append style api.
> (as in guava interables concat)
>
> class Headers {
>Headers append(Iterable headers);
> }
>
>
> I don’t think we’d want prepend, this would give the idea of guaranteed
> ordering, when in actual fact we don’t provide that guarantee (.e.g one
> client can put headerA, then headerB, but another could put headerB then
> headerA, this shouldn’t cause issues), Also what if we changed to a hashmap
> for the internal implementation, its just a bucket of entries no ordering.
> I think we just need to provide an api to add/append headers.
>
> This ok? If so ill update KIP to record this.
>
> Cheers
> Mike
>
> On 23/02/2017, 00:37, "Jason Gustafson"  wrote:
>
> The point about usability is fair. It's also reasonable to expect that
> common use cases such as appending headers should be done efficiently.
>
> Perhaps we could compromise with something like this?
>
> class Headers {
>  Headers append(Iterable headers);
>  Headers prepend(Iterable headers);
> }
>
> That retains ease of use while still giving ourselves some flexibility
> in
> the implementation.
>
> -Jason
>
>
> On Wed, Feb 22, 2017 at 3:03 PM, Michael Pearce  >
> wrote:
>
> > I wasn’t referring to the headers needing to be copied, im meaning
> the
> > fact we’d be forcing a new producer record to be created, with all
> the
> > contents copied.
> >
> > i.e what will happen is utility method will be created or end up
> being
> > used, which does this, and returns the new ProducerRecord instance.
> >
> > ProducerRecord  addHeader(ProducerRecord record, Header header){
> > Return New ProducerRecord(record.key, record.value,
> record.timestamp…..,
> > record.headers.concat(header))
> > }
> >
> > To me this seems ugly, but will be inevitable if we don’t make adding
> > headers to existing records a simple clean method call.
> >
> >
> >
> > On 22/02/2017, 22:57, "Michael Pearce" 
> wrote:
> >
> > Lazy init can achieve/avoid that.
> >
> > Re the concat, why don’t we implement that inside the Headers
> rather
> > than causing everyone to implement this as adding headers in
> interceptors
> > will be a dominant use case. We want a user friendly API. Having as
> a user
> > having to code this instead of having the headers handle this for me
> seems
> > redundant.
> >
> > On 22/02/2017, 22:34, "Jason Gustafson" 
> wrote:
> >
> > I thought the argument was against creating the extra objects
> > unnecessarily
> > (i.e. if they were not accessed). And note that making the
> Headers
> > immutable doesn't necessarily mean that they need to be
> copied:
> > you can do
> > a trick like Guava's Iterables.concat to add additional
> headers
> > without
> > changing the underlying collections.
> >
> > -Jason
> >
> > On Wed, Feb 22, 2017 at 2:22 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > wrote:
> >
> > > If the argument for not having a map holding the key, value
> > pairs is due
> > > to garbage creation of HashMap entry's, forcing the
> creation of
> > a whole new
> > > producer record to simply add a head, surely is creating
> a-lot
> > more?
> > > 
> > > From: Jason Gustafson 
> > > Sent: Wednesday, February 22, 2017 10:09 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-82 - Add Record Headers
> > >
> > > The current producer interceptor API is this:
> > >
> > > ProducerRecord onSend(ProducerRecord record);
> > >
> > > So adding a header means creating a new ProducerRecord
> with a
> > new header
> > > added to the current headers and returning it. Would that
> not
> > work?
> > >
> > > -Jason
> > >
> > > On Wed, Feb 22, 2017 at 1:45 PM, Michael Pearce <
> > michael.pea...@ig.com>
> > > wrote:
> > >
> > > > So how would you have this work if not mutable where
> > interceptors would
> > > > add headers?
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Jason Gustafson 
> > > > Sent: Wednesday, February 22, 2017 8:42:27 PM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [DISCUSS] KIP-82 - Add 

[jira] [Updated] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4788:
---
Fix Version/s: 0.10.2.1
   0.10.3.0

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880300#comment-15880300
 ] 

Ismael Juma commented on KAFKA-4788:


[~omkreddy], would you like to provide a PR? I'll review it and merge it.

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4788:
---
Labels: regression  (was: )

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>  Labels: regression
> Fix For: 0.10.3.0, 0.10.2.1
>
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Rajini Sivaram
Guozhang/Dong,

Thank you for the feedback.

Guozhang : I have updated the section on co-existence of byte rate and
request time quotas.

Dong: I hadn't added much detail to the metrics and sensors since they are
going to be very similar to the existing metrics and sensors. To avoid
confusion, I have now added more detail. All metrics are in the group
"quotaType" and all sensors have names starting with "quotaType" (where
quotaType is Produce/Fetch/LeaderReplication/FollowerReplication/*IOThread*).
So there will be no reuse of existing metrics/sensors. The new ones for
request processing time based throttling will be completely independent of
existing metrics/sensors, but will be consistent in format.

The existing throttle_time_ms field in produce/fetch responses will not be
impacted by this KIP. That will continue to return byte-rate based
throttling times. In addition, a new field request_throttle_time_ms will be
added to return request quota based throttling times. These will be exposed
as new metrics on the client-side.

Since all metrics and sensors are different for each type of quota, I
believe there is already sufficient metrics to monitor throttling on both
client and broker side for each type of throttling.

Regards,

Rajini


On Thu, Feb 23, 2017 at 4:32 AM, Dong Lin  wrote:

> Hey Rajini,
>
> I think it makes a lot of sense to use io_thread_units as metric to quota
> user's traffic here. LGTM overall. I have some questions regarding sensors.
>
> - Can you be more specific in the KIP what sensors will be added? For
> example, it will be useful to specify the name and attributes of these new
> sensors.
>
> - We currently have throttle-time and queue-size for byte-rate based quota.
> Are you going to have separate throttle-time and queue-size for requests
> throttled by io_thread_unit-based quota, or will they share the same
> sensor?
>
> - Does the throttle-time in the ProduceResponse and FetchResponse contains
> time due to io_thread_unit-based quota?
>
> - Currently kafka server doesn't not provide any log or metrics that tells
> whether any given clientId (or user) is throttled. This is not too bad
> because we can still check the client-side byte-rate metric to validate
> whether a given client is throttled. But with this io_thread_unit, there
> will be no way to validate whether a given client is slow because it has
> exceeded its io_thread_unit limit. It is necessary for user to be able to
> know this information to figure how whether they have reached there quota
> limit. How about we add log4j log on the server side to periodically print
> the (client_id, byte-rate-throttle-time, io-thread-unit-throttle-time) so
> that kafka administrator can figure those users that have reached their
> limit and act accordingly?
>
> Thanks,
> Dong
>
>
>
>
>
> On Wed, Feb 22, 2017 at 4:46 PM, Guozhang Wang  wrote:
>
> > Made a pass over the doc, overall LGTM except a minor comment on the
> > throttling implementation:
> >
> > Stated as "Request processing time throttling will be applied on top if
> > necessary." I thought that it meant the request processing time
> throttling
> > is applied first, but continue reading I found it actually meant to apply
> > produce / fetch byte rate throttling first.
> >
> > Also the last sentence "The remaining delay if any is applied to the
> > response." is a bit confusing to me. Maybe rewording it a bit?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Feb 22, 2017 at 3:24 PM, Jun Rao  wrote:
> >
> > > Hi, Rajini,
> > >
> > > Thanks for the updated KIP. The latest proposal looks good to me.
> > >
> > > Jun
> > >
> > > On Wed, Feb 22, 2017 at 2:19 PM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Jun/Roger,
> > > >
> > > > Thank you for the feedback.
> > > >
> > > > 1. I have updated the KIP to use absolute units instead of
> percentage.
> > > The
> > > > property is called* io_thread_units* to align with the thread count
> > > > property *num.io.threads*. When we implement network thread
> utilization
> > > > quotas, we can add another property *network_thread_units.*
> > > >
> > > > 2. ControlledShutdown is already listed under the exempt requests.
> Jun,
> > > did
> > > > you mean a different request that needs to be added? The four
> requests
> > > > currently exempt in the KIP are StopReplica, ControlledShutdown,
> > > > LeaderAndIsr and UpdateMetadata. These are controlled using
> > ClusterAction
> > > > ACL, so it is easy to exclude and only throttle if unauthorized. I
> > wasn't
> > > > sure if there are other requests used only for inter-broker that
> needed
> > > to
> > > > be excluded.
> > > >
> > > > 3. I was thinking the smallest change would be to replace all
> > references
> > > to
> > > > *requestChannel.sendResponse()* with a local method
> > > > *sendResponseMaybeThrottle()* that does the throttling if any plus
> send
> > > > response. If we throttle first in *KafkaApis.handle()*, the time
> spent
> > > > within the method handli

[jira] [Commented] (KAFKA-4461) When using ProcessorTopologyTestDriver, the combination of map and .groupByKey does not produce any result

2017-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880293#comment-15880293
 ] 

ASF GitHub Bot commented on KAFKA-4461:
---

GitHub user hrafzali opened a pull request:

https://github.com/apache/kafka/pull/2587

KAFKA-4461: Added support to ProcessorTopologyTestDriver to forward 
timestamps to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the 
extracted timestamp is not forwarded with the produced record to the internal 
topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang @dguy


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hrafzali/kafka 
KAFKA-4789_ProcessorTopologyTestDriver_timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2587.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2587


commit ad6db45916f28971470c0522584c2dca02ffae76
Author: Hamidreza Afzali 
Date:   2017-02-23T10:53:18Z

KAFKA-4461: Added support to ProcessorTopologyTestDriver to forward 
extracted timestamps to internal topics.




> When using ProcessorTopologyTestDriver, the combination of map and 
> .groupByKey does not produce any result
> --
>
> Key: KAFKA-4461
> URL: https://issues.apache.org/jira/browse/KAFKA-4461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Hamidreza Afzali
>Assignee: Adrian McCague
>  Labels: newbie, unit-test
> Fix For: 0.10.3.0
>
>
> *Problem*
> When using ProcessorTopologyTestDriver in the latest Kafka 0.10.1, the 
> combination of map and .groupByKey does not produce any result. However, it 
> works fine when using KStreamTestDriver.
> The topology looks like this:
> {code}
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
>  .map((k, v) => new KeyValue(fn(k), v))
>  .groupByKey(Serdes.String, Serdes.Integer)
>  .count(stateStore)
> {code}
> *Full examples*
> Examples for ProcessorTopologyTestDriver and KStreamTestDriver:
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
> *Additional info*
> kafka-users mailing list:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201611.mbox/%3CCAHwHRrVq1APVkNhP3HVqxujxRJEP9FwHV2NRcvPPsHX7Wujzng%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2587: KAFKA-4461: Added support to ProcessorTopologyTest...

2017-02-23 Thread hrafzali
GitHub user hrafzali opened a pull request:

https://github.com/apache/kafka/pull/2587

KAFKA-4461: Added support to ProcessorTopologyTestDriver to forward 
timestamps to internal topics

This resolves the issue in the ProcessorTopologyTestDriver that the 
extracted timestamp is not forwarded with the produced record to the internal 
topics.

JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-4789

The contribution is my original work and I license the work to the project 
under the project's open source license.

@guozhangwang @dguy


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hrafzali/kafka 
KAFKA-4789_ProcessorTopologyTestDriver_timestamp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2587.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2587


commit ad6db45916f28971470c0522584c2dca02ffae76
Author: Hamidreza Afzali 
Date:   2017-02-23T10:53:18Z

KAFKA-4461: Added support to ProcessorTopologyTestDriver to forward 
extracted timestamps to internal topics.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880281#comment-15880281
 ] 

Manikumar commented on KAFKA-4788:
--

looks like KAFKA-4092 change introduced this issue.  During validation, we are 
not inlcuding original/broker props. 

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-23 Thread Jorge Esteban Quilcate Otoya
@Matthias about the point 9:

What about keeping only the --topic option, and support this format:

`--topic t1:0,1,2 --topic t2 --topic t3:2`

In this case topics t1, t2, and t3 will be selected: topic t1 with
partitions 0,1 and 2; topic t2 with all its partitions; and topic t3, with
only partition 2.

Jorge.

El mar., 21 feb. 2017 a las 11:11, Jorge Esteban Quilcate Otoya (<
quilcate.jo...@gmail.com>) escribió:

> Thanks for the feedback Matthias.
>
> * 1. You're right. I'll reorder the scenarios.
>
> * 2. Agree. I'll update the KIP.
>
> * 3. I like it, updating to `reset-offsets`
>
> * 4. Agree, removing the `reset-` part
>
> * 5. Yes, 1.e option without --execute or --export will print out current
> offset, and the new offset, that will be the same. The use-case of this
> option is to use it in combination with --export mostly and have a current
> 'checkpoint' to reset later. I will add to the KIP how the output should
> looks like.
>
> * 6. Considering 4., I will update it to `--to-offset`
>
> * 7. I like the idea to unify these options (plus, minus).
> `shift-offsets-by` is a good option, but I will like some more feedback
> here about the name. I will update the KIP in the meantime.
>
> * 8. Yes, discussed in 9.
>
> * 9. Agree. I'll love some feedback here. `topic` is already used by
> `delete`, and we can add `--all-topics` to consider all topics/partitions
> assigned to a group. How could we define specific topics/partitions?
>
> * 10. Haven't thought about it, but make sense.
> ,, would be enough.
>
> * 11. Agree. Solved with 10.
>
> Also, I have a couple of changes to mention:
>
> 1. I have add a reference to the branch where I'm working on this KIP.
>
> 2. About the period scenario `--to-period`. I will change it to
> `--to-duration` given that duration (
> https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html)
> follows this format: 'PnDTnHnMnS' and does not consider daylight saving
> efects.
>
>
>
> El mar., 21 feb. 2017 a las 2:47, Matthias J. Sax ()
> escribió:
>
> Hi,
>
> thanks for updating the KIP. Couple of follow up comments:
>
> * Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
> time" option -- IMHO it belongs to "reset by position"?
>
>
> * Nit: Description of "Reset to Earliest"
>
> > using Kafka Consumer's `auto.offset.reset` to `earliest`
>
> I think this is strictly speaking not correct (as auto.offset.reset only
> triggered if no valid offset is found, but this tool explicitly modified
> committed offset), and should be phrased as
>
> > using Kafka Consumer's #seekToBeginning()
>
> -> similar issue for description of "Reset to Latest"
>
>
> * Main option: rename to --reset-offsets (plural instead of singular)
>
>
> * Scenario Options: I would remove "reset" from all options, because the
> main argument "--reset-offset" says already what to do:
>
> > bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX
>
> better (IMHO):
>
> > bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX
>
>
>
> * Option 1.e ("print and export current offset") is not intuitive to use
> IMHO. The main option is "--reset-offset" but nothing happens if no
> scenario is specified. It is also not specified, what the output should
> look like?
>
> Furthermore, --describe should actually show currently committed offset
> for a group. So it seems to be redundant to have the same option in
> --reset-offsets
>
>
> * Option 2.a: I would rename to "--reset-to-offset" (or considering the
> comment above to "--to-offset")
>
>
> * Option 2.b and 2.c: I would unify to "--shift-offsets-by" (or similar)
> and accept positive/negative values
>
>
> * About Scope "all": maybe it's better to have an option "--all-topics"
> (or similar). IMHO explicit arguments are preferable over implicit
> setting to guard again accidental miss use of the tool.
>
>
> * Scope: I also think, that "--topic" (singular) and "--topics" (plural)
> are too similar and easy to use in a wrong way (ie, mix up) -- maybe we
> can have two options that are easier to distinguish.
>
>
> * I still think that JSON is not the best format (it's too verbose/hard
> to write for humans from scratch). A simple CSV format with implicit
> schema (topic,partition,offset) would be sufficient.
>
>
> * Why does the JSON contain "group_id" field -- there is parameter
> "--group" to specify the group ID. Would one overwrite the other (what
> order) or would there be an error if "--group" is used in combination
> with "--reset-from-file"?
>
>
>
> -Matthias
>
>
>
>
> On 2/17/17 6:43 AM, Jorge Esteban Quilcate Otoya wrote:
> > Hi,
> >
> > according to the feedback, I've updated the KIP:
> >
> > - We have added and ordered the scenarios, scopes and executions of the
> > Reset Offset tool.
> > - Consider it as an extension to the current `ConsumerGroupCommand` tool
> > - Execution will be possible without generating JSON files.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+O

[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ciprian Pascu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880272#comment-15880272
 ] 

Ciprian Pascu commented on KAFKA-4788:
--

One addition: I have log.segment.bytes=2097152 in server.properties.

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ciprian Pascu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880268#comment-15880268
 ] 

Ciprian Pascu commented on KAFKA-4788:
--

I am using command like this to create the topic:
bin/kafka-topics.sh --create --zookeeper x.x.x.x:2181 --replication-factor 1 
--partitions 1 --topic test --config retention.bytes=2675303
The outcome is: Error while executing topic command : segment.bytes 1073741824 
is not less than or equal to retention.bytes 2675303
[2017-02-23 13:08:04,797] ERROR 
org.apache.kafka.common.errors.InvalidConfigurationException: segment.bytes 
1073741824 is not less than or equal to retention.bytes 2675303
 (kafka.admin.TopicCommand$)

This was working fine in version 0.10.1.1.

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4789) ProcessorTopologyTestDriver does not forward extracted timestamps to internal topics

2017-02-23 Thread Hamidreza Afzali (JIRA)
Hamidreza Afzali created KAFKA-4789:
---

 Summary: ProcessorTopologyTestDriver does not forward extracted 
timestamps to internal topics
 Key: KAFKA-4789
 URL: https://issues.apache.org/jira/browse/KAFKA-4789
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Hamidreza Afzali


*Problem:*

When using ProcessorTopologyTestDriver, the extracted timestamp is not 
forwarded with the produced record to the internal topics.

*Example:*

{code}
object Topology1 {

  def main(args: Array[String]): Unit = {

val inputTopic = "input"
val outputTopic = "output"
val stateStore = "count"
val inputs = Seq[(String, Integer)](("A@145000", 1), ("B@145000", 
2))

val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
classOf[MyTimestampExtractor].getName)

val windowedStringSerde = Serdes.serdeFrom(new 
WindowedSerializer(Serdes.String.serializer),
  new WindowedDeserializer(Serdes.String.deserializer))

val builder = new KStreamBuilder
builder.stream(Serdes.String, Serdes.Integer, inputTopic)
  .map[String, Integer]((k, v) => new KeyValue(k.split("@")(0), v))
  .groupByKey(Serdes.String, Serdes.Integer)
  .count(TimeWindows.of(1000L), stateStore)
  .to(windowedStringSerde, Serdes.Long, outputTopic)

val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props), 
builder, stateStore)
inputs.foreach {
  case (key, value) => {
driver.process(inputTopic, key, value, Serdes.String.serializer, 
Serdes.Integer.serializer)
val record = driver.readOutput(outputTopic, Serdes.String.deserializer, 
Serdes.Long.deserializer)
println(record)
  }
}

  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15880190#comment-15880190
 ] 

Ismael Juma commented on KAFKA-4788:


Thanks for the report. What was the most recent version where this worked for 
you and can you share the steps you took to create the topic?

> Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' 
> not configured per topic.
> --
>
> Key: KAFKA-4788
> URL: https://issues.apache.org/jira/browse/KAFKA-4788
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.2.0
>Reporter: Ciprian Pascu
>
> In the previous version, if there was not topic level configuration 
> 'segment.bytes', then the corresponding value from the broker configuration 
> was used (for 'segment.bytes', this is the value configured for 
> 'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
> missing, then the value configured at broker level is not used, some kind of 
> default value for that broker level configuration is used (for 
> 'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 
> 1024').
> However, in the documentation it is said: 'A given server default config 
> value only applies to a topic if it does not have an explicit topic config 
> override.'; so, according to this, if there is, for example, no value 
> configured for 'segment.bytes', then the value configured for 
> 'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4788) Broker level configuration 'log.segment.bytes' not used when 'segment.bytes' not configured per topic.

2017-02-23 Thread Ciprian Pascu (JIRA)
Ciprian Pascu created KAFKA-4788:


 Summary: Broker level configuration 'log.segment.bytes' not used 
when 'segment.bytes' not configured per topic.
 Key: KAFKA-4788
 URL: https://issues.apache.org/jira/browse/KAFKA-4788
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.10.2.0
Reporter: Ciprian Pascu


In the previous version, if there was not topic level configuration 
'segment.bytes', then the corresponding value from the broker configuration was 
used (for 'segment.bytes', this is the value configured for 
'log.segment.bytes'); in 0.10.2.0, if the configuration at topic level is 
missing, then the value configured at broker level is not used, some kind of 
default value for that broker level configuration is used (for 
'log.retention.bytes', this is 'val LogSegmentBytes = 1 * 1024 * 1024 * 1024').
However, in the documentation it is said: 'A given server default config value 
only applies to a topic if it does not have an explicit topic config 
override.'; so, according to this, if there is, for example, no value 
configured for 'segment.bytes', then the value configured for 
'log.segment.bytes' should be used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)