Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Jiangjie Qin
Hi Jay, thanks a lot for the comments.
I think this solution is better. We probably don’t need data channel
anymore. It can be replaced with a list of producer if we need more sender
thread.
I’ll update the KIP page.

The reasoning about message handler is mainly for efficiency purpose. I’m
thinking that if something can be done in pipeline for all the clients
such as filtering/reformatting, it is probably better to do it in the
pipeline than asking 100 clients do the same thing for 100 times.

―Jiangjie (Becket) Qin


On 2/8/15, 4:59 PM, "Jay Kreps"  wrote:

>Yeah, I second Neha's comments. The current mm code has taken something
>pretty simple and made it pretty scary with callbacks and wait/notify
>stuff. Do we believe this works? I can't tell by looking at it which is
>kind of bad for something important like this. I don't mean this as
>criticism, I know the history: we added in memory queues to help with
>other
>performance problems without thinking about correctness, then we added
>stuff to work around the in-memory queues not lose data, and so on.
>
>Can we instead do the opposite exercise and start with the basics of what
>mm should do and think about what deficiencies prevents this approach from
>working? Then let's make sure the currently in-flight work will remove
>these deficiencies. After all mm is kind of the prototypical kafka use
>case
>so if we can't make our clients to this probably no one else can.
>
>I think mm should just be N independent threads each of which has their
>own
>consumer but share a producer and each of which looks like this:
>
>while(true) {
>val recs = consumer.poll(Long.MaxValue);
>for (rec <- recs)
>producer.send(rec, logErrorCallback)
>if(System.currentTimeMillis - lastCommit > commitInterval) {
>producer.flush()
>consumer.commit()
>lastCommit = System.currentTimeMillis
>}
>}
>
>This will depend on setting the retry count in the producer to something
>high with a largish backoff so that a failed send attempt doesn't drop
>data.
>
>We will need to use the callback to force a flush and offset commit on
>rebalance.
>
>This approach may have a few more TCP connections due to using multiple
>consumers but I think it is a lot easier to reason about and the total
>number of mm instances is always going to be small.
>
>Let's talk about where this simple approach falls short, I think that will
>help us understand your motivations for additional elements.
>
>Another advantage of this is that it is so simple I don't think we really
>even need to both making mm extensible because writing your own code that
>does custom processing or transformation is just ten lines and no plug in
>system is going to make it simpler.
>
>-Jay
>
>
>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede  wrote:
>
>> Few comments -
>>
>> 1. Why do we need the message handler? Do you have concrete use cases in
>> mind? If not, we should consider adding it in the future when/if we do
>>have
>> use cases for it. The purpose of the mirror maker is a simple tool for
>> setting up Kafka cluster replicas. I don't see why we need to include a
>> message handler for doing stream transformations or filtering. You can
>> always write a simple process for doing that once the data is copied as
>>is
>> in the target cluster
>> 2. Why keep both designs? We should prefer the simpler design unless it
>>is
>> not feasible due to the performance issue that we previously had. Did
>>you
>> get a chance to run some tests to see if that is really still a problem
>>or
>> not? It will be easier to think about the design and also make the KIP
>> complete if we make a call on the design first.
>> 3. Can you explain the need for keeping a list of unacked offsets per
>> partition? Consider adding a section on retries and how you plan to
>>handle
>> the case when the producer runs out of all retries.
>>
>> Thanks,
>> Neha
>>
>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin 
>> wrote:
>>
>> > Hi Neha,
>> >
>> > Yes, I’ve updated the KIP so the entire KIP is based on new consumer
>>now.
>> > I’ve put both designs with and without data channel in the KIP as I
>>still
>> > feel we might need the data channel to provide more flexibility,
>> > especially after message handler is introduced. I’ve put my thinking
>>of
>> > the pros and cons of the two designs in the KIP as well. It’ll be
>>great
>> if
>> > you can give a review and comment.
>> >
>> > Thanks.
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On 2/6/15, 7:30 PM, "Neha Narkhede"  wrote:
>> >
>> > >Hey Becket,
>> > >
>> > >What are the next steps on this KIP. As per your comment earlier on
>>the
>> > >thread -
>> > >
>> > >I do agree it makes more sense
>> > >> to avoid duplicate effort and plan based on new consumer. I’ll
>>modify
>> > >>the
>> > >> KIP.
>> > >
>> > >
>> > >Did you get a chance to think about the simplified design that we
>> proposed
>> > >earlier? Do you plan to update the KIP with that proposal?
>> > >
>> > >Thanks,
>> > >Neh

Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Jiangjie Qin
Thanks for the feedback, Neha. Please see inline replies.

―Jiangjie (Becket) Qin

On 2/8/15, 2:40 PM, "Neha Narkhede"  wrote:

>Few comments -
>
>1. Why do we need the message handler? Do you have concrete use cases in
>mind? If not, we should consider adding it in the future when/if we do
>have
>use cases for it. The purpose of the mirror maker is a simple tool for
>setting up Kafka cluster replicas. I don't see why we need to include a
>message handler for doing stream transformations or filtering. You can
>always write a simple process for doing that once the data is copied as is
>in the target cluster
We do have a solid use case for message handler:
1. Format conversion. We have a use case where clients of source cluster
use an internal schema and clients of target cluster use a different
public schema. 
2. Message filtering: For the messages published to source cluster, there
are some messages private to source cluster clients and should not exposed
to target cluster clients. It would be difficult to publish those messages
into different partitions because they need to be ordered.
I agree that we can always filter/convert messages after they are copied
to the target cluster, but that costs network bandwidth unnecessarily,
especially if that is a cross colo mirror. With the handler, we can
co-locate the mirror maker with source cluster and save that cost. Also,
imagine there are many downstream consumers consuming from the target
cluster, filtering/reformatting the messages before the messages reach the
target cluster is much more efficient than having each of the consumers do
this individually on their own.
Another use case from open source is to have an "exact mirror”, which
might need to modify the partition in ProducerRecord.


>2. Why keep both designs? We should prefer the simpler design unless it is
>not feasible due to the performance issue that we previously had. Did you
>get a chance to run some tests to see if that is really still a problem or
>not? It will be easier to think about the design and also make the KIP
>complete if we make a call on the design first.
I kept both design because I kind of think the current design has its
merit so I want to have both the simplified and current design on the
table for discussion.
Because this KIP is completely based on the new consumer, I haven’t got a
chance to test the performance yet. My argument for keeping the
flexibility of having different number of producers and consumers is from
the assumption that we have a message handler in mirror maker. If we
finally reach a conclusion to not have a message handler, then I would
also prefer the simple mirror maker design as long as consumer and
producer performance matches.
I have some numbers for old consumer with new producer. Though I’m not
100% sure, but it seems consumer still consumes faster than producer
produces. When acks=-1 is turned on, the latency for producing is at least
an order of magnitude higher than consuming for clients and server in the
same datacenter.

>3. Can you explain the need for keeping a list of unacked offsets per
>partition? Consider adding a section on retries and how you plan to handle
>the case when the producer runs out of all retries.
I’ve just updated the KIP to explain why we used a list of unasked offsets
per partition and what does no data loss mean for mirror maker, including
behavior on retry.
>
>Thanks,
>Neha
>
>On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin 
>wrote:
>
>> Hi Neha,
>>
>> Yes, I’ve updated the KIP so the entire KIP is based on new consumer
>>now.
>> I’ve put both designs with and without data channel in the KIP as I
>>still
>> feel we might need the data channel to provide more flexibility,
>> especially after message handler is introduced. I’ve put my thinking of
>> the pros and cons of the two designs in the KIP as well. It’ll be great
>>if
>> you can give a review and comment.
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 2/6/15, 7:30 PM, "Neha Narkhede"  wrote:
>>
>> >Hey Becket,
>> >
>> >What are the next steps on this KIP. As per your comment earlier on the
>> >thread -
>> >
>> >I do agree it makes more sense
>> >> to avoid duplicate effort and plan based on new consumer. I’ll modify
>> >>the
>> >> KIP.
>> >
>> >
>> >Did you get a chance to think about the simplified design that we
>>proposed
>> >earlier? Do you plan to update the KIP with that proposal?
>> >
>> >Thanks,
>> >Neha
>> >
>> >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin
>>
>> >wrote:
>> >
>> >> In mirror maker we do not do de-serialization on the messages. Mirror
>> >> maker use source TopicPartition hash to chose a producer to send
>> >>messages
>> >> from the same source partition. The partition those messages end up
>>with
>> >> are decided by Partitioner class in KafkaProducer (assuming you are
>> >>using
>> >> the new producer), which uses hash code of bytes[].
>> >>
>> >> If deserialization is needed, it has to be done in message handler.
>> >>
>> >> Thanks.
>> >>
>>

[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-02-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311699#comment-14311699
 ] 

Jay Kreps commented on KAFKA-1660:
--

This is very similar to KAFKA-1659 and KAFKA-1934.

We should do the discussion on the new API as a KIP.

I think there are two possible APIs that could be added.

The first variant attempts to close the producer but will only wait for up to 
the given period of time. It returns whether the close succeeded or not.
{code}
  public boolean tryClose(long timeout, TimeUnit unit);
{code}
One challenge here is how to stop sends on other threads. The current 
implementation will actually throw an exception if you call send after you call 
close. But in this variant I guess the idea is that if tryClose fails you would 
want to have a non-closed producer instance so we would have to rethink that. 
I'm not sure actually how solvable that is.

The second variant always closes the producer and attempts to do this 
gracefully, waiting for all sends to complete, but if graceful sending doesn't 
work by the time the timeout expires it just shuts down the sender thread and 
exits:
{code}
  public void close(long timeout, TimeUnit unit);
{code}

I actually think the second case is more usable---I think in most cases what 
you want is to try to get rid of the messages you have for a good period of 
time but not block forever. I think users of the first api would generally not 
have any good recourse when tryClose failed and would just end up leaking the 
connections and i/o thread.

> Ability to call close() with a timeout on the Java Kafka Producer. 
> ---
>
> Key: KAFKA-1660
> URL: https://issues.apache.org/jira/browse/KAFKA-1660
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 0.8.2
>Reporter: Andrew Stein
>Assignee: Jun Rao
> Fix For: 0.8.3
>
> Attachments: KAFKA-1660.patch
>
>
> I would like the ability to call {{close}} with a timeout on the Java 
> Client's KafkaProducer.
> h6. Workaround
> Currently, it is possible to ensure that {{close}} will return quickly by 
> first doing a {{future.get(timeout)}} on the last future produced on each 
> partition, but this means that the user has to define the partitions up front 
> at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29467: Patch for KAFKA-1660

2015-02-08 Thread Jay Kreps


> On Feb. 9, 2015, 1:27 a.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
> > line 371
> > 
> >
> > This approach will actually leak the sender thread if there are still 
> > unsent requests. I think this is not what people want. I think what they 
> > want is for the sender thread to attempt to send their messages for N ms 
> > and then shutdown if it still hasn't succeeded. Leaking the thread seems 
> > like a bug.

Oh I think I understand the interpretation, the idea is that this is meant to 
attempt to close but then give up if the close doesn't complete in time. The 
problem is that this does actually close the producer but doesn't necessarily 
stop the thread and doesn't return any indication of what happened.


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29467/#review71595
---


On Dec. 29, 2014, 10:52 p.m., Parth Brahmbhatt wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29467/
> ---
> 
> (Updated Dec. 29, 2014, 10:52 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
> https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1660: Adding tryClose(timeoutMillis) to producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 34624c3b7a1f28735ab6c63cc9e18a410e87e63c 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5baa6062bd9ba8a7d38058856ed2d831fae491f0 
> 
> Diff: https://reviews.apache.org/r/29467/diff/
> 
> 
> Testing
> ---
> 
> existing unit tests passed.
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>



[jira] [Resolved] (KAFKA-1934) Add a shutdownNow() call to new producer

2015-02-08 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1934.
--
Resolution: Duplicate

> Add a shutdownNow() call to new producer
> 
>
> Key: KAFKA-1934
> URL: https://issues.apache.org/jira/browse/KAFKA-1934
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case where user want to stop send any more messages if an error 
> occurred on a previous send. Otherwise the message order might be broken. The 
> shutdownNow() call will stop the producer right away without draining the 
> messages in accumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29467: Patch for KAFKA-1660

2015-02-08 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29467/#review71595
---


I would vote for the name
   close(long timeout, TimeUnit unit)
I think the params make it clear that it is an attempt and we can clarify that 
in the docs too.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java


This approach will actually leak the sender thread if there are still 
unsent requests. I think this is not what people want. I think what they want 
is for the sender thread to attempt to send their messages for N ms and then 
shutdown if it still hasn't succeeded. Leaking the thread seems like a bug.


- Jay Kreps


On Dec. 29, 2014, 10:52 p.m., Parth Brahmbhatt wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29467/
> ---
> 
> (Updated Dec. 29, 2014, 10:52 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
> https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1660: Adding tryClose(timeoutMillis) to producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 34624c3b7a1f28735ab6c63cc9e18a410e87e63c 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5baa6062bd9ba8a7d38058856ed2d831fae491f0 
> 
> Diff: https://reviews.apache.org/r/29467/diff/
> 
> 
> Testing
> ---
> 
> existing unit tests passed.
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>



[jira] [Commented] (KAFKA-1933) Fine-grained locking in log append

2015-02-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311690#comment-14311690
 ] 

Jay Kreps commented on KAFKA-1933:
--

This is very interesting but also kind of scary. My observation is that this 
kind of sophisticated locking mixed in with regular code tends to work at first 
but inevitably gets subtly broken over time as people make changes who don't 
understand the magic.

A couple of thoughts:
1. Using the semaphore array to synchronize is correct but confusing since this 
is a common error pattern
2. What is the impact on the non-compressed case?
3. There is a ton of low-hanging fruit in the compression code itself. I 
suspect just optimizing that could yield a comparable 2x improvement and that 
would pay off both on the clients and on the server.
4. There has been some discussion of changing the message format to avoid the 
need for recompressing messages. That is it might be possible to leave the 
compressed messages with offsets 0, 1, 2, etc and have the interpretation be 
relative to some base offset. We would still need to decompress to validate but 
this would avoid the recompression entirely.

> Fine-grained locking in log append
> --
>
> Key: KAFKA-1933
> URL: https://issues.apache.org/jira/browse/KAFKA-1933
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Maxim Ivanov
>Assignee: Jay Kreps
>Priority: Minor
> Fix For: 0.8.2
>
> Attachments: KAFKA-1933.patch
>
>
> This patch adds finer locking when appending to log. It breaks
> global append lock into 2 sequential and 1 parallel phase.
> Basic idea is to allow every thread to "reserve" offsets in non
> overlapping ranges, then do compression in parallel and then
> "commit" write to log in the same order offsets where reserved.
> On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following 
> performance boost:
> LZ4: 7.2 sec -> 3.9 sec
> Gzip: 62.3 sec -> 24.8 sec
> Kafka was configured to run 4 IO threads, data was pushed using 5 netcat 
> instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in 
> total, 82180 messages in total)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1934) Add a shutdownNow() call to new producer

2015-02-08 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311689#comment-14311689
 ] 

Ewen Cheslack-Postava commented on KAFKA-1934:
--

There was previous discussion of abort() and tryClose(timeout) in 
https://issues.apache.org/jira/browse/KAFKA-1659 and 
https://issues.apache.org/jira/browse/KAFKA-1660, including a patch.

> Add a shutdownNow() call to new producer
> 
>
> Key: KAFKA-1934
> URL: https://issues.apache.org/jira/browse/KAFKA-1934
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case where user want to stop send any more messages if an error 
> occurred on a previous send. Otherwise the message order might be broken. The 
> shutdownNow() call will stop the producer right away without draining the 
> messages in accumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1934) Add a shutdownNow() call to new producer

2015-02-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311684#comment-14311684
 ] 

Jay Kreps commented on KAFKA-1934:
--

This is a public interface change so we should do a quick KIP and discuss.

I don't think it makes sense to add a shutdownNow() method given the existing 
method we have is close(), if anything it would be closeNow(). I would propose 
instead adding close(long timeout, TimeUnit unit) where passing in 0 is the 
equivalent of immediate shutdown. This is more general and actually I think the 
better thing to use since generally you want to give some time for a graceful 
shutdown before dropping data.

> Add a shutdownNow() call to new producer
> 
>
> Key: KAFKA-1934
> URL: https://issues.apache.org/jira/browse/KAFKA-1934
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case where user want to stop send any more messages if an error 
> occurred on a previous send. Otherwise the message order might be broken. The 
> shutdownNow() call will stop the producer right away without draining the 
> messages in accumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1934) Add a shutdownNow() call to new producer

2015-02-08 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1934:
-
Issue Type: New Feature  (was: Bug)

> Add a shutdownNow() call to new producer
> 
>
> Key: KAFKA-1934
> URL: https://issues.apache.org/jira/browse/KAFKA-1934
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>
> We have a use case where user want to stop send any more messages if an error 
> occurred on a previous send. Otherwise the message order might be broken. The 
> shutdownNow() call will stop the producer right away without draining the 
> messages in accumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1919) Metadata request issued with no backoff in new producer if there are no topics

2015-02-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311683#comment-14311683
 ] 

Jay Kreps commented on KAFKA-1919:
--

That's reasonable. Patch here:
https://reviews.apache.org/r/30777/

This is actually a more serious issue if the bug exists even against 0.8.2. Not 
sure that we should consider it serious enough for a 0.8.2.1 though.

> Metadata request issued with no backoff in new producer if there are no topics
> --
>
> Key: KAFKA-1919
> URL: https://issues.apache.org/jira/browse/KAFKA-1919
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Jay Kreps
> Attachments: KAFKA-1919-v1.patch
>
>
> Original report:
> We have observed high cpu and high network traffic problem when
> 1) cluster (0.8.1.1) has no topic
> 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic
> We have observed such problem twice. In both cases, problem went away
> immediately after one/any topic is created.
> Is this a known issue? Just want to check with the community first before I
> spend much time to reproduce it.
> I couldn't reproduce the issue with similar setup with unit test code in
> IDE. start two brokers with no topic locally on my laptop. create a
> KafkaProducer object without sending any msgs. but I only tested with
> 0.8.2-beta for both broker and producer.
> Issue exists in 0.8.2 as well:
> I have re-run my unit test with 0.8.2.0. same tight-loop problem happened
> after a few mins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 30777: Patch for KAFKA-1919

2015-02-08 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30777/
---

Review request for kafka.


Bugs: KAFKA-1919
https://issues.apache.org/jira/browse/KAFKA-1919


Repository: kafka


Description
---

KAFKA-1919: Always update the metadata, when a metadata response is received to 
ensure we back off.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b8cdd145bfcc6633763b25fc9812c49627c8df92 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
fef90a03ed04d2ded1971f4a7b69b730494aacf8 

Diff: https://reviews.apache.org/r/30777/diff/


Testing
---


Thanks,

Jay Kreps



Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Jay Kreps
Yeah, I second Neha's comments. The current mm code has taken something
pretty simple and made it pretty scary with callbacks and wait/notify
stuff. Do we believe this works? I can't tell by looking at it which is
kind of bad for something important like this. I don't mean this as
criticism, I know the history: we added in memory queues to help with other
performance problems without thinking about correctness, then we added
stuff to work around the in-memory queues not lose data, and so on.

Can we instead do the opposite exercise and start with the basics of what
mm should do and think about what deficiencies prevents this approach from
working? Then let's make sure the currently in-flight work will remove
these deficiencies. After all mm is kind of the prototypical kafka use case
so if we can't make our clients to this probably no one else can.

I think mm should just be N independent threads each of which has their own
consumer but share a producer and each of which looks like this:

while(true) {
val recs = consumer.poll(Long.MaxValue);
for (rec <- recs)
producer.send(rec, logErrorCallback)
if(System.currentTimeMillis - lastCommit > commitInterval) {
producer.flush()
consumer.commit()
lastCommit = System.currentTimeMillis
}
}

This will depend on setting the retry count in the producer to something
high with a largish backoff so that a failed send attempt doesn't drop data.

We will need to use the callback to force a flush and offset commit on
rebalance.

This approach may have a few more TCP connections due to using multiple
consumers but I think it is a lot easier to reason about and the total
number of mm instances is always going to be small.

Let's talk about where this simple approach falls short, I think that will
help us understand your motivations for additional elements.

Another advantage of this is that it is so simple I don't think we really
even need to both making mm extensible because writing your own code that
does custom processing or transformation is just ten lines and no plug in
system is going to make it simpler.

-Jay


On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede  wrote:

> Few comments -
>
> 1. Why do we need the message handler? Do you have concrete use cases in
> mind? If not, we should consider adding it in the future when/if we do have
> use cases for it. The purpose of the mirror maker is a simple tool for
> setting up Kafka cluster replicas. I don't see why we need to include a
> message handler for doing stream transformations or filtering. You can
> always write a simple process for doing that once the data is copied as is
> in the target cluster
> 2. Why keep both designs? We should prefer the simpler design unless it is
> not feasible due to the performance issue that we previously had. Did you
> get a chance to run some tests to see if that is really still a problem or
> not? It will be easier to think about the design and also make the KIP
> complete if we make a call on the design first.
> 3. Can you explain the need for keeping a list of unacked offsets per
> partition? Consider adding a section on retries and how you plan to handle
> the case when the producer runs out of all retries.
>
> Thanks,
> Neha
>
> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin 
> wrote:
>
> > Hi Neha,
> >
> > Yes, I’ve updated the KIP so the entire KIP is based on new consumer now.
> > I’ve put both designs with and without data channel in the KIP as I still
> > feel we might need the data channel to provide more flexibility,
> > especially after message handler is introduced. I’ve put my thinking of
> > the pros and cons of the two designs in the KIP as well. It’ll be great
> if
> > you can give a review and comment.
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On 2/6/15, 7:30 PM, "Neha Narkhede"  wrote:
> >
> > >Hey Becket,
> > >
> > >What are the next steps on this KIP. As per your comment earlier on the
> > >thread -
> > >
> > >I do agree it makes more sense
> > >> to avoid duplicate effort and plan based on new consumer. I’ll modify
> > >>the
> > >> KIP.
> > >
> > >
> > >Did you get a chance to think about the simplified design that we
> proposed
> > >earlier? Do you plan to update the KIP with that proposal?
> > >
> > >Thanks,
> > >Neha
> > >
> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin  >
> > >wrote:
> > >
> > >> In mirror maker we do not do de-serialization on the messages. Mirror
> > >> maker use source TopicPartition hash to chose a producer to send
> > >>messages
> > >> from the same source partition. The partition those messages end up
> with
> > >> are decided by Partitioner class in KafkaProducer (assuming you are
> > >>using
> > >> the new producer), which uses hash code of bytes[].
> > >>
> > >> If deserialization is needed, it has to be done in message handler.
> > >>
> > >> Thanks.
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" 
> > >>wrote:
> > >>
> > >> >Hi Jiangj

[jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names

2015-02-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311632#comment-14311632
 ] 

Guozhang Wang commented on KAFKA-1884:
--

[~omkreddy] I think this is a valid point, we should handle this exception 
better at the server side to return corresponding error code.

> New Producer blocks forever for Invalid topic names
> ---
>
> Key: KAFKA-1884
> URL: https://issues.apache.org/jira/browse/KAFKA-1884
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Manikumar Reddy
> Fix For: 0.8.3
>
>
> New producer blocks forever for invalid topics names
> producer logs:
> DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50845,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,416] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50845.
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50846,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,417] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50846.
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,418] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50847,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,418] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50847.
> Broker logs:
> [2015-01-20 12:46:14,074] ERROR [KafkaApi-0] error when handling request 
> Name: TopicMetadataRequest; Version: 0; CorrelationId: 51020; ClientId: 
> my-producer; Topics: TOPIC= (kafka.server.KafkaApis)
> kafka.common.InvalidTopicException: topic name TOPIC= is illegal, contains a 
> character other than ASCII alphanumerics, '.', '_' and '-'
>   at kafka.common.Topic$.validate(Topic.scala:42)
>   at 
> kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:186)
>   at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:177)
>   at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:367)
>   at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:350)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at 
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>   at scala.collection.SetLike$class.map(SetLike.scala:93)
>   at scala.collection.AbstractSet.map(Set.scala:47)
>   at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:350)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:389)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:57)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>   at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1919) Metadata request issued with no backoff in new producer if there are no topics

2015-02-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311630#comment-14311630
 ] 

Guozhang Wang commented on KAFKA-1919:
--

Comments for the patch: instead of calling metadata.update(), shall we just 
update its lastRefreshMs?

> Metadata request issued with no backoff in new producer if there are no topics
> --
>
> Key: KAFKA-1919
> URL: https://issues.apache.org/jira/browse/KAFKA-1919
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Jay Kreps
> Attachments: KAFKA-1919-v1.patch
>
>
> Original report:
> We have observed high cpu and high network traffic problem when
> 1) cluster (0.8.1.1) has no topic
> 2) KafkaProducer (0.8.2-beta) object is created without sending any traffic
> We have observed such problem twice. In both cases, problem went away
> immediately after one/any topic is created.
> Is this a known issue? Just want to check with the community first before I
> spend much time to reproduce it.
> I couldn't reproduce the issue with similar setup with unit test code in
> IDE. start two brokers with no topic locally on my laptop. create a
> KafkaProducer object without sending any msgs. but I only tested with
> 0.8.2-beta for both broker and producer.
> Issue exists in 0.8.2 as well:
> I have re-run my unit test with 0.8.2.0. same tight-loop problem happened
> after a few mins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1908) Split brain

2015-02-08 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311617#comment-14311617
 ] 

Jiangjie Qin commented on KAFKA-1908:
-

[~aozeritsky] It looks the scenario should not happen by design. Ideally what 
should happen is as below:
1. Initially every broker works well. Leader is on broker 1.
2. After port 9092 is disabled, no further connection could be established to 
the broker 1, but existing connections are not affected, so broker 
3(controller) and broker 1 are still connected.
3. run a preferred leader election successfully.
4. Given what mentioned in 2), broker 3 should be able to send 
UpdateMetadataRequest to broker 1, so broker 1 should become follower in that 
case.

One possibility I can think of which might cause the result you saw is that 
somehow broker 3 to broker 1 connection were lost after you disabled port 9092 
on broker 1. In that case, broker 3 cannot connect to broker 1 again so broker 
1 will miss UpdateMetadataRequest thus remain to think itself as leader. I 
think this is what mentioned by Gwen as "multi-lan" or network partition.

Could you verify from controller log to see if broker 3 to broker 1 connection 
has ever broken?

> Split brain
> ---
>
> Key: KAFKA-1908
> URL: https://issues.apache.org/jira/browse/KAFKA-1908
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Alexey Ozeritskiy
>
> In some cases, there may be two leaders for one partition.
> Steps to reproduce:
> # We have 3 brokers, 1 partition with 3 replicas:
> {code}
> TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code} 
> # controller works on broker 3
> # let the kafka port be 9092. We execute on broker 1:
> {code}
> iptables -A INPUT -p tcp --dport 9092 -j REJECT
> {code}
> # Initiate replica election
> # As a result:
> Broker 1:
> {code}
> TopicAndPartition: [partition,0]Leader: 1   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code}
> Broker 2:
> {code}
> TopicAndPartition: [partition,0]Leader: 2   Replicas: [2,1,3]   
> ISR: [1,2,3]
> {code}
> # Flush the iptables rules on broker 1
> Now we can produce messages to {code}[partition,0]{code}. Replica-1 will not 
> receive new data. A consumer can read data from replica-1 or replica-2. When 
> it reads from replica-1 it resets the offsets and than can read duplicates 
> from replica-2.
> We saw this situation in our production cluster when it had network problems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Neha Narkhede
Few comments -

1. Why do we need the message handler? Do you have concrete use cases in
mind? If not, we should consider adding it in the future when/if we do have
use cases for it. The purpose of the mirror maker is a simple tool for
setting up Kafka cluster replicas. I don't see why we need to include a
message handler for doing stream transformations or filtering. You can
always write a simple process for doing that once the data is copied as is
in the target cluster
2. Why keep both designs? We should prefer the simpler design unless it is
not feasible due to the performance issue that we previously had. Did you
get a chance to run some tests to see if that is really still a problem or
not? It will be easier to think about the design and also make the KIP
complete if we make a call on the design first.
3. Can you explain the need for keeping a list of unacked offsets per
partition? Consider adding a section on retries and how you plan to handle
the case when the producer runs out of all retries.

Thanks,
Neha

On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin 
wrote:

> Hi Neha,
>
> Yes, I’ve updated the KIP so the entire KIP is based on new consumer now.
> I’ve put both designs with and without data channel in the KIP as I still
> feel we might need the data channel to provide more flexibility,
> especially after message handler is introduced. I’ve put my thinking of
> the pros and cons of the two designs in the KIP as well. It’ll be great if
> you can give a review and comment.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 2/6/15, 7:30 PM, "Neha Narkhede"  wrote:
>
> >Hey Becket,
> >
> >What are the next steps on this KIP. As per your comment earlier on the
> >thread -
> >
> >I do agree it makes more sense
> >> to avoid duplicate effort and plan based on new consumer. I’ll modify
> >>the
> >> KIP.
> >
> >
> >Did you get a chance to think about the simplified design that we proposed
> >earlier? Do you plan to update the KIP with that proposal?
> >
> >Thanks,
> >Neha
> >
> >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin 
> >wrote:
> >
> >> In mirror maker we do not do de-serialization on the messages. Mirror
> >> maker use source TopicPartition hash to chose a producer to send
> >>messages
> >> from the same source partition. The partition those messages end up with
> >> are decided by Partitioner class in KafkaProducer (assuming you are
> >>using
> >> the new producer), which uses hash code of bytes[].
> >>
> >> If deserialization is needed, it has to be done in message handler.
> >>
> >> Thanks.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" 
> >>wrote:
> >>
> >> >Hi Jiangjie,
> >> >
> >> >Thanks for entertaining my question so far.  Last question, I have is
> >> >about
> >> >serialization of message key.  If the key de-serialization (Class) is
> >>not
> >> >present at the MM instance, then does it use raw byte hashcode to
> >> >determine
> >> >the partition ?  How are you going to address the situation where key
> >> >needs
> >> >to be de-serialization and get actual hashcode needs to be computed  ?.
> >> >
> >> >
> >> >Thanks,
> >> >
> >> >Bhavesh
> >> >
> >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin
> >>
> >> >wrote:
> >> >
> >> >> Hi Bhavesh,
> >> >>
> >> >> Please see inline comments.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" 
> >> >>wrote:
> >> >>
> >> >> >Hi Jiangjie,
> >> >> >
> >> >> >Thanks for the input.
> >> >> >
> >> >> >a) Is MM will  producer ack will be attach to Producer Instance or
> >>per
> >> >> >topic.  Use case is that one instance of MM
> >> >> >needs to handle both strong ack and also ack=0 for some topic.  Or
> >>it
> >> >> >would
> >> >> >be better to set-up another instance of MM.
> >> >> The acks setting is producer level setting instead of topic level
> >> >>setting.
> >> >> In this case you probably need to set up another instance.
> >> >> >
> >> >> >b) Regarding TCP connections, Why does #producer instance attach to
> >>TCP
> >> >> >connection.  Is it possible to use Broker Connection TCP Pool,
> >>producer
> >> >> >will just checkout TCP connection  to Broker.  So, # of Producer
> >> >>Instance
> >> >> >does not correlation to Brokers Connection.  Is this possible ?
> >> >> In new producer, each producer maintains a connection to each broker
> >> >> within the producer instance. Making producer instances to share the
> >>TCP
> >> >> connections is a very big change to the current design, so I suppose
> >>we
> >> >> won’t be able to do that.
> >> >> >
> >> >> >
> >> >> >Thanks,
> >> >> >
> >> >> >Bhavesh
> >> >> >
> >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
> >> >> >> >> >
> >> >> >wrote:
> >> >> >
> >> >> >> Hi Bhavesh,
> >> >> >>
> >> >> >> I think it is the right discussion to have when we are talking
> >>about
> >> >>the
> >> >> >> new new design for MM.
> >> >> >> Please see the inline comments.
> >> >> >>
> >> >> >> Jiangjie (Becket) Qin
> >> >> >>
> >> >> >> On 1/28/15, 

[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2015-02-08 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311599#comment-14311599
 ] 

Jiangjie Qin commented on KAFKA-1447:
-

I think KAFKA-1305 solved this issue.

> Controlled shutdown deadlock when trying to send state updates
> --
>
> Key: KAFKA-1447
> URL: https://issues.apache.org/jira/browse/KAFKA-1447
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Priority: Critical
>  Labels: newbie++
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
> waiting on condition [0x7f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x00078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x00078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x00078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1934) Add a shutdownNow() call to new producer

2015-02-08 Thread Jiangjie Qin (JIRA)
Jiangjie Qin created KAFKA-1934:
---

 Summary: Add a shutdownNow() call to new producer
 Key: KAFKA-1934
 URL: https://issues.apache.org/jira/browse/KAFKA-1934
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin


We have a use case where user want to stop send any more messages if an error 
occurred on a previous send. Otherwise the message order might be broken. The 
shutdownNow() call will stop the producer right away without draining the 
messages in accumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Jiangjie Qin
Hi Neha,

Yes, I’ve updated the KIP so the entire KIP is based on new consumer now.
I’ve put both designs with and without data channel in the KIP as I still
feel we might need the data channel to provide more flexibility,
especially after message handler is introduced. I’ve put my thinking of
the pros and cons of the two designs in the KIP as well. It’ll be great if
you can give a review and comment.

Thanks.

Jiangjie (Becket) Qin

On 2/6/15, 7:30 PM, "Neha Narkhede"  wrote:

>Hey Becket,
>
>What are the next steps on this KIP. As per your comment earlier on the
>thread -
>
>I do agree it makes more sense
>> to avoid duplicate effort and plan based on new consumer. I’ll modify
>>the
>> KIP.
>
>
>Did you get a chance to think about the simplified design that we proposed
>earlier? Do you plan to update the KIP with that proposal?
>
>Thanks,
>Neha
>
>On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin 
>wrote:
>
>> In mirror maker we do not do de-serialization on the messages. Mirror
>> maker use source TopicPartition hash to chose a producer to send
>>messages
>> from the same source partition. The partition those messages end up with
>> are decided by Partitioner class in KafkaProducer (assuming you are
>>using
>> the new producer), which uses hash code of bytes[].
>>
>> If deserialization is needed, it has to be done in message handler.
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 2/4/15, 11:33 AM, "Bhavesh Mistry" 
>>wrote:
>>
>> >Hi Jiangjie,
>> >
>> >Thanks for entertaining my question so far.  Last question, I have is
>> >about
>> >serialization of message key.  If the key de-serialization (Class) is
>>not
>> >present at the MM instance, then does it use raw byte hashcode to
>> >determine
>> >the partition ?  How are you going to address the situation where key
>> >needs
>> >to be de-serialization and get actual hashcode needs to be computed  ?.
>> >
>> >
>> >Thanks,
>> >
>> >Bhavesh
>> >
>> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin
>>
>> >wrote:
>> >
>> >> Hi Bhavesh,
>> >>
>> >> Please see inline comments.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" 
>> >>wrote:
>> >>
>> >> >Hi Jiangjie,
>> >> >
>> >> >Thanks for the input.
>> >> >
>> >> >a) Is MM will  producer ack will be attach to Producer Instance or
>>per
>> >> >topic.  Use case is that one instance of MM
>> >> >needs to handle both strong ack and also ack=0 for some topic.  Or
>>it
>> >> >would
>> >> >be better to set-up another instance of MM.
>> >> The acks setting is producer level setting instead of topic level
>> >>setting.
>> >> In this case you probably need to set up another instance.
>> >> >
>> >> >b) Regarding TCP connections, Why does #producer instance attach to
>>TCP
>> >> >connection.  Is it possible to use Broker Connection TCP Pool,
>>producer
>> >> >will just checkout TCP connection  to Broker.  So, # of Producer
>> >>Instance
>> >> >does not correlation to Brokers Connection.  Is this possible ?
>> >> In new producer, each producer maintains a connection to each broker
>> >> within the producer instance. Making producer instances to share the
>>TCP
>> >> connections is a very big change to the current design, so I suppose
>>we
>> >> won’t be able to do that.
>> >> >
>> >> >
>> >> >Thanks,
>> >> >
>> >> >Bhavesh
>> >> >
>> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
>> >>> >> >
>> >> >wrote:
>> >> >
>> >> >> Hi Bhavesh,
>> >> >>
>> >> >> I think it is the right discussion to have when we are talking
>>about
>> >>the
>> >> >> new new design for MM.
>> >> >> Please see the inline comments.
>> >> >>
>> >> >> Jiangjie (Becket) Qin
>> >> >>
>> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry"
>>
>> >> >>wrote:
>> >> >>
>> >> >> >Hi Jiangjie,
>> >> >> >
>> >> >> >I just wanted to let you know about our use case and stress the
>> >>point
>> >> >>that
>> >> >> >local data center broker cluster have fewer partitions than the
>> >> >> >destination
>> >> >> >offline broker cluster. Just because we do the batch pull from
>>CAMUS
>> >> >>and
>> >> >> >in
>> >> >> >order to drain data faster than the injection rate (from four DCs
>> >>for
>> >> >>same
>> >> >> >topic).
>> >> >> Keeping the same partition number in source and target cluster
>>will
>> >>be
>> >> >>an
>> >> >> option but will not be enforced by default.
>> >> >> >
>> >> >> >We are facing following issues (probably due to configuration):
>> >> >> >
>> >> >> >1)  We occasionally loose data due to message batch size is
>>too
>> >> >>large
>> >> >> >(2MB) on target data (we are using old producer but I think new
>> >> >>producer
>> >> >> >will solve this problem to some extend).
>> >> >> We do see this issue in LinkedIn as well. New producer also might
>> >>have
>> >> >> this issue. There are some proposal of solutions, but no real work
>> >> >>started
>> >> >> yet. For now, as a workaround, setting a more aggressive batch
>>size
>> >>on
>> >> >> producer side should work.
>> >> >> >2)  Since only one instance is set to MM da

[GitHub] kafka pull request: Finer locking in log append

2015-02-08 Thread redbaron
Github user redbaron closed the pull request at:

https://github.com/apache/kafka/pull/43


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Well actually in the case of linger.ms = 0 the send is still asynchronous
so calling flush() blocks until all the previously sent records have
completed. It doesn't speed anything up in that case, though, since they
are already available to send.

-Jay

On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira  wrote:

> Looks good to me.
>
> I like the idea of not blocking additional sends but not guaranteeing that
> flush() will deliver them.
>
> I assume that with linger.ms = 0, flush will just be a noop (since the
> queue will be empty). Is that correct?
>
> Gwen
>
> On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps  wrote:
>
> > Following up on our previous thread on making batch send a little easier,
> > here is a concrete proposal to add a flush() method to the producer:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
> >
> > A proposed implementation is here:
> > https://issues.apache.org/jira/browse/KAFKA-1865
> >
> > Thoughts?
> >
> > -Jay
> >
>


[jira] [Commented] (KAFKA-1933) Fine-grained locking in log append

2015-02-08 Thread Maxim Ivanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311527#comment-14311527
 ] 

Maxim Ivanov commented on KAFKA-1933:
-

Created reviewboard https://reviews.apache.org/r/30775/diff/
 against branch origin/0.8.2

> Fine-grained locking in log append
> --
>
> Key: KAFKA-1933
> URL: https://issues.apache.org/jira/browse/KAFKA-1933
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Maxim Ivanov
>Assignee: Jay Kreps
>Priority: Minor
> Fix For: 0.8.2
>
> Attachments: KAFKA-1933.patch
>
>
> This patch adds finer locking when appending to log. It breaks
> global append lock into 2 sequential and 1 parallel phase.
> Basic idea is to allow every thread to "reserve" offsets in non
> overlapping ranges, then do compression in parallel and then
> "commit" write to log in the same order offsets where reserved.
> On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following 
> performance boost:
> LZ4: 7.2 sec -> 3.9 sec
> Gzip: 62.3 sec -> 24.8 sec
> Kafka was configured to run 4 IO threads, data was pushed using 5 netcat 
> instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in 
> total, 82180 messages in total)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1933) Fine-grained locking in log append

2015-02-08 Thread Maxim Ivanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Ivanov updated KAFKA-1933:

Attachment: KAFKA-1933.patch

> Fine-grained locking in log append
> --
>
> Key: KAFKA-1933
> URL: https://issues.apache.org/jira/browse/KAFKA-1933
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Maxim Ivanov
>Assignee: Jay Kreps
>Priority: Minor
> Fix For: 0.8.2
>
> Attachments: KAFKA-1933.patch
>
>
> This patch adds finer locking when appending to log. It breaks
> global append lock into 2 sequential and 1 parallel phase.
> Basic idea is to allow every thread to "reserve" offsets in non
> overlapping ranges, then do compression in parallel and then
> "commit" write to log in the same order offsets where reserved.
> On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following 
> performance boost:
> LZ4: 7.2 sec -> 3.9 sec
> Gzip: 62.3 sec -> 24.8 sec
> Kafka was configured to run 4 IO threads, data was pushed using 5 netcat 
> instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in 
> total, 82180 messages in total)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 30775: Fine-grained locking in log.append

2015-02-08 Thread Maxim Ivanov

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30775/
---

Review request for kafka.


Bugs: KAFKA-1933
https://issues.apache.org/jira/browse/KAFKA-1933


Repository: kafka


Description
---

This patch adds finer locking when appending to log. It breaks
global append lock into 2 sequential and 1 parallel phase.

Basic idea is to allow every thread to "reserve" offsets in non
overlapping ranges, then do compression in parallel and then
"commit" write to log in the same order offsets where reserved.


Diffs
-

  core/src/main/scala/kafka/log/Log.scala 
ec192155bec7b643025f8044b0b6565c7b9977d1 

Diff: https://reviews.apache.org/r/30775/diff/


Testing
---


Thanks,

Maxim Ivanov



[jira] [Created] (KAFKA-1933) Fine-grained locking in log append

2015-02-08 Thread Maxim Ivanov (JIRA)
Maxim Ivanov created KAFKA-1933:
---

 Summary: Fine-grained locking in log append
 Key: KAFKA-1933
 URL: https://issues.apache.org/jira/browse/KAFKA-1933
 Project: Kafka
  Issue Type: Improvement
  Components: log
Reporter: Maxim Ivanov
Assignee: Jay Kreps
Priority: Minor
 Fix For: 0.8.2


This patch adds finer locking when appending to log. It breaks
global append lock into 2 sequential and 1 parallel phase.

Basic idea is to allow every thread to "reserve" offsets in non
overlapping ranges, then do compression in parallel and then
"commit" write to log in the same order offsets where reserved.

On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following performance 
boost:

LZ4: 7.2 sec -> 3.9 sec
Gzip: 62.3 sec -> 24.8 sec

Kafka was configured to run 4 IO threads, data was pushed using 5 netcat 
instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in total, 
82180 messages in total)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30763: Patch for KAFKA-1865

2015-02-08 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review71573
---


Minor issue with cleaning an InterruptionException, but otherwise looks good to 
me.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java


This isn't properly reset in the case of InterruptionExceptions. This 
should be in a finally block.


- Ewen Cheslack-Postava


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> ---
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 17fe541588d462c68c33f6209717cc4015e9b62f 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 75513b0bdd439329c5771d87436ef83fda853bfb 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



[GitHub] kafka pull request: Finer locking in log append

2015-02-08 Thread redbaron
GitHub user redbaron opened a pull request:

https://github.com/apache/kafka/pull/43

Finer locking in log append

This patch adds finer locking when appending to log. It breaks
global append lock into 2 sequential and 1 parallel phase.

Basic idea is to allow every thread to "reserve" offsets in non
overlapping ranges, then do compression in parallel and then
"commit" write to log in the same order offsets where reserved.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/redbaron/kafka finer-lock-in-log-append

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/43.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #43


commit e1e4cf6685e3b5ca7900635a11a8a0be80a569a5
Author: Maxim Ivanov 
Date:   2015-02-08T18:41:06Z

Finer locking in log append

This patch adds finer locking when appending to log. It breaks
global append lock into 2 sequential and 1 parallel phase.

Basic idea is to allow every thread to "reserve" offsets in non
overlapping ranges, then do compression in parallel and then
"commit" write to log in the same order offsets where reserved.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Gwen Shapira
Looks good to me.

I like the idea of not blocking additional sends but not guaranteeing that
flush() will deliver them.

I assume that with linger.ms = 0, flush will just be a noop (since the
queue will be empty). Is that correct?

Gwen

On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps  wrote:

> Following up on our previous thread on making batch send a little easier,
> here is a concrete proposal to add a flush() method to the producer:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
>
> A proposed implementation is here:
> https://issues.apache.org/jira/browse/KAFKA-1865
>
> Thoughts?
>
> -Jay
>


[DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Following up on our previous thread on making batch send a little easier,
here is a concrete proposal to add a flush() method to the producer:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

A proposed implementation is here:
https://issues.apache.org/jira/browse/KAFKA-1865

Thoughts?

-Jay


[jira] [Updated] (KAFKA-1856) Add PreCommit Patch Testing

2015-02-08 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1856:
-
Attachment: KAFKA-1845.result.txt

really cool, just tried this out 

{code}

python dev-utils/test-patch.py --defect KAFKA-1845 --output patch-process 
--run-tests

{code}

which I think once this is in the jenkins build would have shown up on the 
KAFKA-1845 ticket as 

Testing file 
[KAFKA-1845_2015-02-08_17%3A05%3A22.patch|https://issues.apache.org/jira/secure/attachment/12697336/KAFKA-1845_2015-02-08_17%3A05%3A22.patch]
 against branch trunk took 0:31:28.393900.

{color:green}Overall:{color} +1 all checks pass

{color:green}SUCCESS:{color} Gradle bootstrap was successful
{color:green}SUCCESS:{color} Clean was successful
{color:green}SUCCESS:{color} Patch applied, but there has been warnings:
{code}:233: space before tab in indent.
if (trimmed.equalsIgnoreCase("true"))
:234: space before tab in indent.
return true;
:235: space before tab in indent.
else if (trimmed.equalsIgnoreCase("false"))
:236: space before tab in indent.
return false;
:237: space before tab in indent.
else
warning: squelched 1 whitespace error
warning: 6 lines add whitespace errors.
{code}

{color:green}SUCCESS:{color} Patch add/modify test case
{color:green}SUCCESS:{color} Gradle bootstrap was successful
{color:green}SUCCESS:{color} Patch compiled
{color:green}SUCCESS:{color} Checked style for Main
{color:green}SUCCESS:{color} Checked style for Test
{color:green}SUCCESS:{color} All unit tests passed

This message is automatically generated.


> Add PreCommit Patch Testing
> ---
>
> Key: KAFKA-1856
> URL: https://issues.apache.org/jira/browse/KAFKA-1856
> Project: Kafka
>  Issue Type: Task
>Reporter: Ashish Kumar Singh
>Assignee: Ashish Kumar Singh
> Attachments: KAFKA-1845.result.txt, KAFKA-1856.patch, 
> KAFKA-1856_2015-01-18_21:43:56.patch, KAFKA-1856_2015-02-04_14:57:05.patch, 
> KAFKA-1856_2015-02-04_15:44:47.patch
>
>
> h1. Kafka PreCommit Patch Testing - *Don't wait for it to break*
> h2. Motivation
> *With great power comes great responsibility* - Uncle Ben. As Kafka user list 
> is growing, mechanism to ensure quality of the product is required. Quality 
> becomes hard to measure and maintain in an open source project, because of a 
> wide community of contributors. Luckily, Kafka is not the first open source 
> project and can benefit from learnings of prior projects.
> PreCommit tests are the tests that are run for each patch that gets attached 
> to an open JIRA. Based on tests results, test execution framework, test bot, 
> +1 or -1 the patch. Having PreCommit tests take the load off committers to 
> look at or test each patch.
> h2. Tests in Kafka
> h3. Unit and Integraiton Tests
> [Unit and Integration 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Unit+and+Integration+Tests]
>  are cardinal to help contributors to avoid breaking existing functionalities 
> while adding new functionalities or fixing older ones. These tests, atleast 
> the ones relevant to the changes, must be run by contributors before 
> attaching a patch to a JIRA.
> h3. System Tests
> [System 
> tests|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests] 
> are much wider tests that, unlike unit tests, focus on end-to-end scenarios 
> and not some specific method or class.
> h2. Apache PreCommit tests
> Apache provides a mechanism to automatically build a project and run a series 
> of tests whenever a patch is uploaded to a JIRA. Based on test execution, the 
> test framework will comment with a +1 or -1 on the JIRA.
> You can read more about the framework here:
> http://wiki.apache.org/general/PreCommitBuilds
> h2. Plan
> # Create a test-patch.py script (similar to the one used in Flume, Sqoop and 
> other projects) that will take a jira as a parameter, apply on the 
> appropriate branch, build the project, run tests and report results. This 
> script should be committed into the Kafka code-base. To begin with, this will 
> only run unit tests. We can add code sanity checks, system_tests, etc in the 
> future.
> # Create a jenkins job for running the test (as described in 
> http://wiki.apache.org/general/PreCommitBuilds) and validate that it works 
> manually. This must be done by a committer with Jenkins access.
> # Ask someone with access to https://builds.apache.org/job/PreCommit-Admin/ 
> to add Kafka to the list of projects PreCommit-Admin triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: org.apache.common migration

2015-02-08 Thread Joe Stein
Argh, I just realized that the producer and consumer have already almost
removed that so it wouldn't be in common but just something for the
broker.  Maybe later this year 0.9/1.0 item to crack into.

On Sun, Feb 8, 2015 at 11:34 AM, Joe Stein  wrote:

> Jay,
>
> Can we add another package (or two) to org.apache.kafka.common for
> metadata and consensus.  We can call them something else but the idea would
> be to have 1 common layer for meta data information (right now we put the
> json into zookeeper) and 1 common layer for asynchronous watches (which we
> wait for zookeeper to call us). It would be great to have that code
> something we can wrap zkclient around (or currator) that can insulate the
> different options growing in both of those areas.
>
> Both the meta data code and async watches we would be able to run any
> class we load in supporting the interface expected. The async watch
> interface can have as an input to pass the loaded class a callback and when
> the watcher fires (regardless if from etcd or zookeeper) the code gets the
> response it expected and needed. We should also expose a function that
> returns a future from the watcher.
>
> This may cause a little more work also if we wanted to take the JSON and
> turn that into byte structure ... or we just keep to the JSON and keep to
> making it describable and self documenting?
>
> For the meta data information I think that is separate because that data
> right now (outside of kafka) already resides in other systems like
> databases and/or caches. Folks may opt just to switch the meta data out to
> reduce the burden on zookeeper to just doing the asynchronous watchers.
> Some folks may want to swap both out.
>
> These two layers could also just be 2-3 more files in utils.
>
> - Joestein
>
> On Sun, Feb 8, 2015 at 11:04 AM, Gwen Shapira 
> wrote:
>
>> Thanks for the background.
>>
>> I picked the Network classes portion of it, since I was already looking at
>> how to refactor send/receive and friends to support extending with TLS and
>> SASL. Having to do this in just one place will be really nice :)
>>
>> Gwen
>>
>> On Sun, Feb 8, 2015 at 7:26 AM, Jay Kreps  wrote:
>>
>> > Hey all,
>> >
>> > Someone asked about why there is code duplication between
>> org.apache.common
>> > and core. The answer seemed like it might be useful to others, so
>> including
>> > it here:
>> >
>> > Originally Kafka was more of a proof of concept and we didn't separate
>> the
>> > clients from the server. LinkedIn was much smaller and it wasn't open
>> > source, and keeping those separate always adds a lot of overhead. So we
>> > ended up with just one big jar.
>> >
>> > Next thing we know the kafka jar is embedded everywhere. Lot's of
>> fallout
>> > from that
>> > - It has to be really sensitive to dependencies
>> > - Scala causes all kinds of pain for users. Ironically it causes the
>> most
>> > pain for people using scala because of compatibility. I think the single
>> > biggest Kafka complaint was the scala clients and resulting scary
>> > exceptions, lack of javadoc, etc.
>> > - Many of the client interfaces weren't well thought out as permanent
>> > long-term commitments.
>> > - We new we had to rewrite both clients due to technical deficiencies
>> > anyway. The clients really needed to move to non-blocking I/O which is
>> > basically a rewrite on it's own.
>> >
>> > So how to go about that?
>> >
>> > Well we felt we needed to maintain the old client interfaces for a good
>> > period of time. Any kind of breaking cut-over was kind of a non-starter.
>> > But a major refactoring in place was really hard since so many classes
>> were
>> > public and so little attention had been paid to the difference between
>> > public and private classes.
>> >
>> > Naturally since the client and server do the inverse of each other
>> there is
>> > a ton of shared logic. So we thought we needed to break it up into three
>> > independent chunks:
>> > 1. common - shared helper code used by both clients and server
>> > 2. clients - the producer, consumer, and eventually admin java
>> interfaces.
>> > This depends on common.
>> > 3. server - the server (and legacy clients). This is currently called
>> core.
>> > This will depend on common and clients (because sometimes the server
>> needs
>> > to make client requests)
>> >
>> > Common and clients were left as a single jar and just logically
>> separate so
>> > that people wouldn't have to deal with two jars (and hence the
>> possibility
>> > of getting different versions of each).
>> >
>> > The dependency is actually a little counter-intuitive to people--they
>> > usually think of the client as depending on the server since the client
>> > calls the server. But in terms of code dependencies it is the other
>> way--if
>> > you depend on the client you obviously don't want to drag in the server.
>> >
>> > So to get all this done we decided to just go big and do a rewrite of
>> the
>> > clients in Java. A result of this is that any

[jira] [Commented] (KAFKA-313) Add JSON/CSV output and looping options to ConsumerOffsetChecker

2015-02-08 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311445#comment-14311445
 ] 

Neha Narkhede commented on KAFKA-313:
-

bq. If KAFKA-1476 will be committed soon, it makes sense to re-implement this 
in the new ConsumerCommand and deprecate the existing ConsumerOffsetChecker.

+1. I'm helping us commit KAFKA-1476

> Add JSON/CSV output and looping options to ConsumerOffsetChecker
> 
>
> Key: KAFKA-313
> URL: https://issues.apache.org/jira/browse/KAFKA-313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dave DeMaagd
>Assignee: Ashish Kumar Singh
>Priority: Minor
>  Labels: newbie, patch
> Fix For: 0.8.3
>
> Attachments: KAFKA-313-2012032200.diff, KAFKA-313.1.patch, 
> KAFKA-313.patch
>
>
> Adds:
> * '--loop N' - causes the program to loop forever, sleeping for up to N 
> seconds between loops (loop time minus collection time, unless that's less 
> than 0, at which point it will just run again immediately)
> * '--asjson' - display as a JSON string instead of the more human readable 
> output format.
> Neither of the above  depend on each other (you can loop in the human 
> readable output, or do a single shot execution with JSON output).  Existing 
> behavior/output maintained if neither of the above are used.  Diff Attached.
> Impacted files:
> core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1758) corrupt recovery file prevents startup

2015-02-08 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1758:
-
Component/s: log

> corrupt recovery file prevents startup
> --
>
> Key: KAFKA-1758
> URL: https://issues.apache.org/jira/browse/KAFKA-1758
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Jason Rosenberg
>  Labels: newbie
> Fix For: 0.9.0
>
>
> Hi,
> We recently had a kafka node go down suddenly. When it came back up, it 
> apparently had a corrupt recovery file, and refused to startup:
> {code}
> 2014-11-06 08:17:19,299  WARN [main] server.KafkaServer - Error starting up 
> KafkaServer
> java.lang.NumberFormatException: For input string: 
> "^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
> ^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@"
> at 
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:481)
> at java.lang.Integer.parseInt(Integer.java:527)
> at 
> scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
> at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:76)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:106)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at 
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at kafka.log.LogManager.loadLogs(LogManager.scala:105)
> at kafka.log.LogManager.(LogManager.scala:57)
> at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
> {code}
> And the app is under a monitor (so it was repeatedly restarting and failing 
> with this error for several minutes before we got to it)…
> We moved the ‘recovery-point-offset-checkpoint’ file out of the way, and it 
> then restarted cleanly (but of course re-synced all it’s data from replicas, 
> so we had no data loss).
> Anyway, I’m wondering if that’s the expected behavior? Or should it not 
> declare it corrupt and then proceed automatically to an unclean restart?
> Should this NumberFormatException be handled a bit more gracefully?
> We saved the corrupt file if it’s worth inspecting (although I doubt it will 
> be useful!)….
> The corrupt files appeared to be all zeroes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: org.apache.common migration

2015-02-08 Thread Joe Stein
Jay,

Can we add another package (or two) to org.apache.kafka.common for metadata
and consensus.  We can call them something else but the idea would be to
have 1 common layer for meta data information (right now we put the json
into zookeeper) and 1 common layer for asynchronous watches (which we wait
for zookeeper to call us). It would be great to have that code something we
can wrap zkclient around (or currator) that can insulate the different
options growing in both of those areas.

Both the meta data code and async watches we would be able to run any class
we load in supporting the interface expected. The async watch interface can
have as an input to pass the loaded class a callback and when the watcher
fires (regardless if from etcd or zookeeper) the code gets the response it
expected and needed. We should also expose a function that returns a future
from the watcher.

This may cause a little more work also if we wanted to take the JSON and
turn that into byte structure ... or we just keep to the JSON and keep to
making it describable and self documenting?

For the meta data information I think that is separate because that data
right now (outside of kafka) already resides in other systems like
databases and/or caches. Folks may opt just to switch the meta data out to
reduce the burden on zookeeper to just doing the asynchronous watchers.
Some folks may want to swap both out.

These two layers could also just be 2-3 more files in utils.

- Joestein

On Sun, Feb 8, 2015 at 11:04 AM, Gwen Shapira  wrote:

> Thanks for the background.
>
> I picked the Network classes portion of it, since I was already looking at
> how to refactor send/receive and friends to support extending with TLS and
> SASL. Having to do this in just one place will be really nice :)
>
> Gwen
>
> On Sun, Feb 8, 2015 at 7:26 AM, Jay Kreps  wrote:
>
> > Hey all,
> >
> > Someone asked about why there is code duplication between
> org.apache.common
> > and core. The answer seemed like it might be useful to others, so
> including
> > it here:
> >
> > Originally Kafka was more of a proof of concept and we didn't separate
> the
> > clients from the server. LinkedIn was much smaller and it wasn't open
> > source, and keeping those separate always adds a lot of overhead. So we
> > ended up with just one big jar.
> >
> > Next thing we know the kafka jar is embedded everywhere. Lot's of fallout
> > from that
> > - It has to be really sensitive to dependencies
> > - Scala causes all kinds of pain for users. Ironically it causes the most
> > pain for people using scala because of compatibility. I think the single
> > biggest Kafka complaint was the scala clients and resulting scary
> > exceptions, lack of javadoc, etc.
> > - Many of the client interfaces weren't well thought out as permanent
> > long-term commitments.
> > - We new we had to rewrite both clients due to technical deficiencies
> > anyway. The clients really needed to move to non-blocking I/O which is
> > basically a rewrite on it's own.
> >
> > So how to go about that?
> >
> > Well we felt we needed to maintain the old client interfaces for a good
> > period of time. Any kind of breaking cut-over was kind of a non-starter.
> > But a major refactoring in place was really hard since so many classes
> were
> > public and so little attention had been paid to the difference between
> > public and private classes.
> >
> > Naturally since the client and server do the inverse of each other there
> is
> > a ton of shared logic. So we thought we needed to break it up into three
> > independent chunks:
> > 1. common - shared helper code used by both clients and server
> > 2. clients - the producer, consumer, and eventually admin java
> interfaces.
> > This depends on common.
> > 3. server - the server (and legacy clients). This is currently called
> core.
> > This will depend on common and clients (because sometimes the server
> needs
> > to make client requests)
> >
> > Common and clients were left as a single jar and just logically separate
> so
> > that people wouldn't have to deal with two jars (and hence the
> possibility
> > of getting different versions of each).
> >
> > The dependency is actually a little counter-intuitive to people--they
> > usually think of the client as depending on the server since the client
> > calls the server. But in terms of code dependencies it is the other
> way--if
> > you depend on the client you obviously don't want to drag in the server.
> >
> > So to get all this done we decided to just go big and do a rewrite of the
> > clients in Java. A result of this is that any shared code would have to
> > move to Java (so the clients don't pull in Scala). We felt this was
> > probably a good thing in its own right as it gave a chance to improve a
> few
> > of these utility libraries like config parsing, etc.
> >
> > So the plan was and is:
> > 1. Rewrite producer, release and roll out
> > 2a. Rewrite consumer, release and roll out
> > 2b. Migrate server from scala

[jira] [Assigned] (KAFKA-1929) Convert core kafka module to use the errors in org.apache.kafka.common.errors

2015-02-08 Thread Jeff Holoman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Holoman reassigned KAFKA-1929:
---

Assignee: Jeff Holoman

> Convert core kafka module to use the errors in org.apache.kafka.common.errors
> -
>
> Key: KAFKA-1929
> URL: https://issues.apache.org/jira/browse/KAFKA-1929
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Jeff Holoman
>
> With the introduction of the common package there are now a lot of errors 
> duplicated in both the common package and in the server. We should refactor 
> the server code (but not the scala clients) to switch over to the exceptions 
> in common.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1718) "Message Size Too Large" error when only small messages produced with Snappy

2015-02-08 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311387#comment-14311387
 ] 

Evan Huus commented on KAFKA-1718:
--

[~guozhang], [~jkreps] my understanding is that while this is a known 
limitation of the current design (and the wiki now reflects that limitation), 
this ticket is still open to track support for multiple compressed message-sets 
in a single produce request. The points I made in my comment on Oct 21st still 
stand.

I'm not sure if there's been any progress in the actual implementation of that 
support.

> "Message Size Too Large" error when only small messages produced with Snappy
> 
>
> Key: KAFKA-1718
> URL: https://issues.apache.org/jira/browse/KAFKA-1718
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Evan Huus
>Priority: Critical
>
> I'm the primary author of the Go bindings, and while I originally received 
> this as a bug against my bindings, I'm coming to the conclusion that it's a 
> bug in the broker somehow.
> Specifically, take a look at the last two kafka packets in the following 
> packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
> will need a trunk build of Wireshark to fully decode the kafka part of the 
> packets).
> The produce request contains two partitions on one topic. Each partition has 
> one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
> message set is a sequential collection of snappy-compressed messages, each 
> message of size 46899. When uncompressed, each message contains a message set 
> of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
> However, the broker responds to this with a MessageSizeTooLarge error, full 
> stacktrace from the broker logs being:
> kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
> which exceeds the maximum configured message size of 112.
>   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
>   at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>   at kafka.log.Log.append(Log.scala:265)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
>   at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
>   at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>   at java.lang.Thread.run(Thread.java:695)
> Since as far as I can tell none of the sizes in the actual produced packet 
> exceed the defined maximum, I can only assume that the broker is 
> miscalculating something somewhere and throwing the exception improperly.
> ---
> This issue can be reliably reproduced using an out-of-the-box binary download 
> of 0.8.1.1 and the following gist: 
> https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
> the `producer-ng` branch of the Sarama library).
> ---
> I am happy to provide any more information you might need, or to do relevant 
> experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29893: Patch for KAFKA-1856

2015-02-08 Thread Joe Stein

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29893/#review71574
---



dev-utils/test-patch.py


What is this structure that comes back? When it sorts does it do the latest 
time first and uses that patch? I am running this right now so not much 
feedback but would like to understand more about which patches we can flag for 
it or do we always go with the latest one? If we ever have a patch that has to 
go in 2 branches then we just then have 2 JIRA which is ok by me.


- Joe Stein


On Feb. 4, 2015, 11:44 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29893/
> ---
> 
> (Updated Feb. 4, 2015, 11:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1856
> https://issues.apache.org/jira/browse/KAFKA-1856
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1856: Add PreCommit Patch Testing
> 
> 
> Diffs
> -
> 
>   dev-utils/test-patch.py PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/29893/diff/
> 
> 
> Testing
> ---
> 
> Tested on KAFKA-1664, 
> https://issues.apache.org/jira/browse/KAFKA-1664?focusedCommentId=14277439&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14277439
> 
> How to run:
> python dev-utils/test-patch.py --defect KAFKA-1664 --username  
> --password  --run-tests --post-results
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



[jira] [Resolved] (KAFKA-1486) Move all request/responses to use schema-utils

2015-02-08 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-1486.
-
Resolution: Duplicate

I think this duplicated KAFKA-1927. Re-open and explain if I got it wrong :)

> Move all request/responses to use schema-utils
> --
>
> Key: KAFKA-1486
> URL: https://issues.apache.org/jira/browse/KAFKA-1486
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joel Koshy
>Assignee: Gwen Shapira
>  Labels: newbie
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We started doing this for the new producer.
> We should do the same on the server-side as well. It will make it a more 
> convenient to evolve the wire-protocol over time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: org.apache.common migration

2015-02-08 Thread Gwen Shapira
Thanks for the background.

I picked the Network classes portion of it, since I was already looking at
how to refactor send/receive and friends to support extending with TLS and
SASL. Having to do this in just one place will be really nice :)

Gwen

On Sun, Feb 8, 2015 at 7:26 AM, Jay Kreps  wrote:

> Hey all,
>
> Someone asked about why there is code duplication between org.apache.common
> and core. The answer seemed like it might be useful to others, so including
> it here:
>
> Originally Kafka was more of a proof of concept and we didn't separate the
> clients from the server. LinkedIn was much smaller and it wasn't open
> source, and keeping those separate always adds a lot of overhead. So we
> ended up with just one big jar.
>
> Next thing we know the kafka jar is embedded everywhere. Lot's of fallout
> from that
> - It has to be really sensitive to dependencies
> - Scala causes all kinds of pain for users. Ironically it causes the most
> pain for people using scala because of compatibility. I think the single
> biggest Kafka complaint was the scala clients and resulting scary
> exceptions, lack of javadoc, etc.
> - Many of the client interfaces weren't well thought out as permanent
> long-term commitments.
> - We new we had to rewrite both clients due to technical deficiencies
> anyway. The clients really needed to move to non-blocking I/O which is
> basically a rewrite on it's own.
>
> So how to go about that?
>
> Well we felt we needed to maintain the old client interfaces for a good
> period of time. Any kind of breaking cut-over was kind of a non-starter.
> But a major refactoring in place was really hard since so many classes were
> public and so little attention had been paid to the difference between
> public and private classes.
>
> Naturally since the client and server do the inverse of each other there is
> a ton of shared logic. So we thought we needed to break it up into three
> independent chunks:
> 1. common - shared helper code used by both clients and server
> 2. clients - the producer, consumer, and eventually admin java interfaces.
> This depends on common.
> 3. server - the server (and legacy clients). This is currently called core.
> This will depend on common and clients (because sometimes the server needs
> to make client requests)
>
> Common and clients were left as a single jar and just logically separate so
> that people wouldn't have to deal with two jars (and hence the possibility
> of getting different versions of each).
>
> The dependency is actually a little counter-intuitive to people--they
> usually think of the client as depending on the server since the client
> calls the server. But in terms of code dependencies it is the other way--if
> you depend on the client you obviously don't want to drag in the server.
>
> So to get all this done we decided to just go big and do a rewrite of the
> clients in Java. A result of this is that any shared code would have to
> move to Java (so the clients don't pull in Scala). We felt this was
> probably a good thing in its own right as it gave a chance to improve a few
> of these utility libraries like config parsing, etc.
>
> So the plan was and is:
> 1. Rewrite producer, release and roll out
> 2a. Rewrite consumer, release and roll out
> 2b. Migrate server from scala code to org.apache.common classes
> 3. Deprecate scala clients
>
> (2a) Is is in flight now, and that means (2b) is totally up for grabs. Of
> these the request conversion is definitely the most pressing since having
> those defined twice duplicates a ton of work. We will have to be
> hyper-conscientious during the conversion about making the shared code in
> common really solve the problem well and conveniently on the server as well
> (so we don't end up just shoe-horning it in). My hope is that we can treat
> this common code really well--it isn't as permanent as the public classes
> but ends up heavily used so we should take good care of it. Most the shared
> code is private so we can refactor the stuff in common to meet the needs of
> the server if we find mismatches or missing functionality. I tried to keep
> in mind the eventual server usage while writing it, but I doubt it will be
> as trivial as just deleting the old and adding the new.
>
> In terms of the simplicity:
> - Converting exceptions should be trivial
> - Converting utils is straight-forward but we should evaluate the
> individual utilities and see if they actually make sense, have tests, are
> used, etc.
> - Converting the requests may not be too complex but touches a huge hunk of
> code and may require some effort to decouple the network layer.
> - Converting the network code will be delicate and may require some changes
> in org.apache.common.network to meet the server's needs
>
> This is all a lot of work, but if we stick to it at the end we will have
> really nice clients and a nice modular code base. :-)
>
> Cheers,
>
> -Jay
>


[jira] [Assigned] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-02-08 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira reassigned KAFKA-1928:
---

Assignee: Gwen Shapira

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-08 Thread Jay Kreps
Hey Jiangjie,

Re routing support doesn't force clients to use it. Java and all existing
clients would work as now where request are intelligently routed by the
client, but this would lower the bar for new clients. That said I agree the
case for reroute get admin commands is much stronger than data.

The idea of separating admin/metadata from would definitely solve some
problems but it would also add a lot of complexity--new ports, thread
pools, etc. this is an interesting idea to think over but I'm not sure if
it's worth it. Probably a separate effort in any case.

-jay

On Friday, February 6, 2015, Jiangjie Qin  wrote:

> I¹m a little bit concerned about the request routers among brokers.
> Typically we have a dominant percentage of produce and fetch
> request/response. Routing them from one broker to another seems not wanted.
> Also I think we generally have two types of requests/responses: data
> related and admin related. It is typically a good practice to separate
> data plain from control plain. That suggests we should have another admin
> port to serve those admin requests and probably have different
> authentication/authorization from the data port.
>
> Jiangjie (Becket) Qin
>
> On 2/6/15, 11:18 AM, "Joe Stein"  wrote:
>
> >I updated the installation and sample usage for the existing patches on
> >the
> >KIP site
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and
> >+centralized+administrative+operations
> >
> >There are still a few pending items here.
> >
> >1) There was already some discussion about using the Broker that is the
> >Controller here https://issues.apache.org/jira/browse/KAFKA-1772 and we
> >should elaborate on that more in the thread or agree we are ok with admin
> >asking for the controller to talk to and then just sending that broker the
> >admin tasks.
> >
> >2) I like this idea https://issues.apache.org/jira/browse/KAFKA-1912 but
> >we
> >can refactor after KAFK-1694 committed, no? I know folks just want to talk
> >to the broker that is the controller. It may even become useful to have
> >the
> >controller run on a broker that isn't even a topic broker anymore (small
> >can of worms I am opening here but it elaborates on Guozhang's hot spot
> >point.
> >
> >3) anymore feedback?
> >
> >- Joe Stein
> >
> >On Fri, Jan 23, 2015 at 3:15 PM, Guozhang Wang 
> wrote:
> >
> >> A centralized admin operation protocol would be very useful.
> >>
> >> One more general comment here is that controller is originally designed
> >>to
> >> only talk to other brokers through ControllerChannel, while the broker
> >> instance which carries the current controller is agnostic of its
> >>existence,
> >> and use KafkaApis to handle general Kafka requests. Having all admin
> >> requests redirected to the controller instance will force the broker to
> >>be
> >> aware of its carried controller, and access its internal data for
> >>handling
> >> these requests. Plus with the number of clients out of Kafka's control,
> >> this may easily cause the controller to be a hot spot in terms of
> >>request
> >> load.
> >>
> >>
> >> On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein 
> >>wrote:
> >>
> >> > inline
> >> >
> >> > On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps 
> >>wrote:
> >> >
> >> > > Hey Joe,
> >> > >
> >> > > This is great. A few comments on KIP-4
> >> > >
> >> > > 1. This is much needed functionality, but there are a lot of the so
> >> let's
> >> > > really think these protocols through. We really want to end up with
> >>a
> >> set
> >> > > of well thought-out, orthoganol apis. For this reason I think it is
> >> > really
> >> > > important to think through the end state even if that includes APIs
> >>we
> >> > > won't implement in the first phase.
> >> > >
> >> >
> >> > ok
> >> >
> >> >
> >> > >
> >> > > 2. Let's please please please wait until we have switched the server
> >> over
> >> > > to the new java protocol definitions. If we add upteen more ad hoc
> >> scala
> >> > > objects that is just generating more work for the conversion we
> >>know we
> >> > > have to do.
> >> > >
> >> >
> >> > ok :)
> >> >
> >> >
> >> > >
> >> > > 3. This proposal introduces a new type of optional parameter. This
> >>is
> >> > > inconsistent with everything else in the protocol where we use -1 or
> >> some
> >> > > other marker value. You could argue either way but let's stick with
> >> that
> >> > > for consistency. For clients that implemented the protocol in a
> >>better
> >> > way
> >> > > than our scala code these basic primitives are hard to change.
> >> > >
> >> >
> >> > yes, less confusing, ok.
> >> >
> >> >
> >> > >
> >> > > 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest
> >>which
> >> > has
> >> > > brokers, topics, and partitions. I think we should rename that
> >>request
> >> > > ClusterMetadataRequest (or just MetadataRequest) and include the id
> >>of
> >> > the
> >> > > controller. Or are there other things we could add here?
> >> > >
> >> >
> >> > We could add br

Re: [DISCUSS] ConfigDec Broker Changes on Trunk

2015-02-08 Thread Jay Kreps
Yeah totally, all the cleanups should be independent, this thread just
reminded me to file tickets for them.

-jay

On Sunday, February 8, 2015, Gwen Shapira  wrote:

> I think the new tickets can be done in parallel, and are not an actual
> dependency for KAFKA-1845. Is that correct?
>
> On Sat, Feb 7, 2015 at 1:44 PM, Jay Kreps  > wrote:
>
> > I don't think we need a KIP/vote here, this is just an internal
> > refactoring. We had said previously and noted in the document that the
> KIPs
> > were just for big new features or public api changes.
> >
> > I am a big +1 on the idea. We'll have to be careful in the code review
> > since it would really easy to cause subtle issues and it is hard to
> review
> > this kind of change.
> >
> > For what it is worth the high-level idea of adding a bunch of helper code
> > in org.apache.kafka.common is to start to incorporate this on the server
> > and replace the utilities there. This will just help reduce the total
> code
> > size.
> >
> > A few of the highlights there are:
> > 1. Replace kafka.utils.Utils with o.a.k.common.utils.Utils. This will
> > likely involve some thought and refactoring. Anything non-general purpose
> > should move out of Utils entirely and anything that remains should be
> high
> > quality, general purpose, and have some tests. We may want to keep a
> > ScalaUtils with a couple of things that aren't really doable/convenient
> in
> > Java. This should be straight-forward.
> > 2. Refactor the network server to make use of the classes in
> > o.a.k.common.network (receive, send, etc). It might be doable to make use
> > of Selector as well.
> > 3. Replace the request classes in kafka.api with the ones in
> > o.a.k.common.requests. This is one of the more valuable things we can do
> as
> > that will get us to having a single definition of the protocol.
> > 4. Make use of the exceptions in o.a.k.common.errors
> > 5. Switch over to the new metrics library.
> >
> > I'll file tickets for these.
> >
> > -Jay
> >
> > On Fri, Feb 6, 2015 at 11:16 AM, Joe Stein  > wrote:
> >
> > > I created KIP-12
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-12+change+broker+configuration+properties+to+be+consistent+with+the+rest+of+the+code
> > > and linked it to this thread and the JIRA with the v1 patch. The
> rebased
> > > version with updates for the current review should be ready to review
> in
> > > the next few days.
> > >
> > > On Fri, Feb 6, 2015 at 1:31 PM, Jeff Holoman  >
> > > wrote:
> > >
> > > > I think this is a good change. Is there general agreement that we are
> > > > moving forward with this approach? It would be nice to start using
> this
> > > for
> > > > future work.
> > > >
> > > > Thanks
> > > >
> > > > Jeff
> > > >
> > > > On Tue, Feb 3, 2015 at 9:34 AM, Joe Stein  >
> > wrote:
> > > >
> > > > > I updated the RB changing some of the HIGH to MEDIUM and LOW.
> > > > >
> > > > > There might be other or different opinions and they may change over
> > > time
> > > > so
> > > > > I don't really see h/m/l as a blocker to the patch going in.
> > > > >
> > > > > It would be great to take all the rb feedback from today and then
> > > > tomorrow
> > > > > rebase and include changes for a new patch.
> > > > >
> > > > > Then over the next day or two review, test and commit to trunk (or
> > > > re-work
> > > > > if necessary).
> > > > >
> > > > > /***
> > > > >  Joe Stein
> > > > >  Founder, Principal Consultant
> > > > >  Big Data Open Source Security LLC
> > > > >  http://www.stealth.ly
> > > > >  Twitter: @allthingshadoop  >
> > > > > /
> > > > >
> > > > > On Tue, Feb 3, 2015 at 4:56 AM, Andrii Biletskyi <
> > > > > andrii.bilets...@stealth.ly > wrote:
> > > > >
> > > > > > It'd be great to have it on trunk.
> > > > > > As I mentioned under jira ticket (KAFKA-1845) current
> > implementation
> > > > > lacks
> > > > > > correct Importance settings.
> > > > > > I'd be grateful if somebody could help me with it (a simple
> mapping
> > > > > between
> > > > > > config setting and importance or comments right in the review
> board
> > > > would
> > > > > > suffice).
> > > > > >
> > > > > > Thanks,
> > > > > > Andrii Biletskyi
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 11:38 PM, Gwen Shapira <
> > gshap...@cloudera.com 
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Strong +1 from me (obviously). Lots of good reasons to do it:
> > > > > > > consistency, code reuse, better validations, etc, etc.
> > > > > > >
> > > > > > > I had one comment on the patch in RB, but it can also be
> > refactored
> > > > as
> > > > > > > follow up JIRA to avoid blocking everyone who is waiting on
> this.
> > > > > > >
> > > > > > > Gwen
> > > > > > >
> > > > > > > On Mon, Feb 2, 2015 at 1:31 PM, Joe Stein <
> joe.st...@stealth.ly >
> > > > > wrote:
> > > > > > > > Hey, I wanted to start a quick convo around some changes on
>

org.apache.common migration

2015-02-08 Thread Jay Kreps
Hey all,

Someone asked about why there is code duplication between org.apache.common
and core. The answer seemed like it might be useful to others, so including
it here:

Originally Kafka was more of a proof of concept and we didn't separate the
clients from the server. LinkedIn was much smaller and it wasn't open
source, and keeping those separate always adds a lot of overhead. So we
ended up with just one big jar.

Next thing we know the kafka jar is embedded everywhere. Lot's of fallout
from that
- It has to be really sensitive to dependencies
- Scala causes all kinds of pain for users. Ironically it causes the most
pain for people using scala because of compatibility. I think the single
biggest Kafka complaint was the scala clients and resulting scary
exceptions, lack of javadoc, etc.
- Many of the client interfaces weren't well thought out as permanent
long-term commitments.
- We new we had to rewrite both clients due to technical deficiencies
anyway. The clients really needed to move to non-blocking I/O which is
basically a rewrite on it's own.

So how to go about that?

Well we felt we needed to maintain the old client interfaces for a good
period of time. Any kind of breaking cut-over was kind of a non-starter.
But a major refactoring in place was really hard since so many classes were
public and so little attention had been paid to the difference between
public and private classes.

Naturally since the client and server do the inverse of each other there is
a ton of shared logic. So we thought we needed to break it up into three
independent chunks:
1. common - shared helper code used by both clients and server
2. clients - the producer, consumer, and eventually admin java interfaces.
This depends on common.
3. server - the server (and legacy clients). This is currently called core.
This will depend on common and clients (because sometimes the server needs
to make client requests)

Common and clients were left as a single jar and just logically separate so
that people wouldn't have to deal with two jars (and hence the possibility
of getting different versions of each).

The dependency is actually a little counter-intuitive to people--they
usually think of the client as depending on the server since the client
calls the server. But in terms of code dependencies it is the other way--if
you depend on the client you obviously don't want to drag in the server.

So to get all this done we decided to just go big and do a rewrite of the
clients in Java. A result of this is that any shared code would have to
move to Java (so the clients don't pull in Scala). We felt this was
probably a good thing in its own right as it gave a chance to improve a few
of these utility libraries like config parsing, etc.

So the plan was and is:
1. Rewrite producer, release and roll out
2a. Rewrite consumer, release and roll out
2b. Migrate server from scala code to org.apache.common classes
3. Deprecate scala clients

(2a) Is is in flight now, and that means (2b) is totally up for grabs. Of
these the request conversion is definitely the most pressing since having
those defined twice duplicates a ton of work. We will have to be
hyper-conscientious during the conversion about making the shared code in
common really solve the problem well and conveniently on the server as well
(so we don't end up just shoe-horning it in). My hope is that we can treat
this common code really well--it isn't as permanent as the public classes
but ends up heavily used so we should take good care of it. Most the shared
code is private so we can refactor the stuff in common to meet the needs of
the server if we find mismatches or missing functionality. I tried to keep
in mind the eventual server usage while writing it, but I doubt it will be
as trivial as just deleting the old and adding the new.

In terms of the simplicity:
- Converting exceptions should be trivial
- Converting utils is straight-forward but we should evaluate the
individual utilities and see if they actually make sense, have tests, are
used, etc.
- Converting the requests may not be too complex but touches a huge hunk of
code and may require some effort to decouple the network layer.
- Converting the network code will be delicate and may require some changes
in org.apache.common.network to meet the server's needs

This is all a lot of work, but if we stick to it at the end we will have
really nice clients and a nice modular code base. :-)

Cheers,

-Jay


[jira] [Commented] (KAFKA-1845) KafkaConfig should use ConfigDef

2015-02-08 Thread Andrii Biletskyi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311353#comment-14311353
 ] 

Andrii Biletskyi commented on KAFKA-1845:
-

Updated reviewboard https://reviews.apache.org/r/30126/diff/
 against branch origin/trunk

> KafkaConfig should use ConfigDef 
> -
>
> Key: KAFKA-1845
> URL: https://issues.apache.org/jira/browse/KAFKA-1845
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Andrii Biletskyi
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1845.patch, KAFKA-1845_2015-02-08_17:05:22.patch
>
>
> ConfigDef is already used for the new producer and for TopicConfig. 
> Will be nice to standardize and use one configuration and validation library 
> across the board.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1845) KafkaConfig should use ConfigDef

2015-02-08 Thread Andrii Biletskyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Biletskyi updated KAFKA-1845:

Attachment: KAFKA-1845_2015-02-08_17:05:22.patch

> KafkaConfig should use ConfigDef 
> -
>
> Key: KAFKA-1845
> URL: https://issues.apache.org/jira/browse/KAFKA-1845
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Andrii Biletskyi
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1845.patch, KAFKA-1845_2015-02-08_17:05:22.patch
>
>
> ConfigDef is already used for the new producer and for TopicConfig. 
> Will be nice to standardize and use one configuration and validation library 
> across the board.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30126: Patch for KAFKA-1845

2015-02-08 Thread Andrii Biletskyi

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30126/
---

(Updated Feb. 8, 2015, 3:05 p.m.)


Review request for kafka.


Bugs: KAFKA-1845
https://issues.apache.org/jira/browse/KAFKA-1845


Repository: kafka


Description (updated)
---

KAFKA-1845 - Fixed merge conflicts, ported added configs to KafkaConfig


KAFKA-1845 - KafkaConfig to ConfigDef: moved validateValues so it's called on 
instantiating KafkaConfig


KAFKA-1845 - KafkaConfig to ConfigDef: MaxConnectionsPerIpOverrides refactored


KAFKA-1845 - code review fixes, merge conflicts after rebase


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
852a9b39a2f9cd71d176943be86531a43ede 
  core/src/main/scala/kafka/Kafka.scala 
77a49e12af6f869e63230162e9f87a7b0b12b610 
  core/src/main/scala/kafka/controller/KafkaController.scala 
66df6d2fbdbdd556da6bea0df84f93e0472c8fbf 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
4a31c7271c2d0a4b9e8b28be729340ecfa0696e5 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
6879e730282185bda3d6bc3659cb15af0672cecf 
  core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
5650b4a7b950b48af3e272947bfb5e271c4238c9 
  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
e63558889272bc76551accdfd554bdafde2e0dd6 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
1bf2667f47853585bc33ffb3e28256ec5f24ae84 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 
e28979827110dfbbb92fe5b152e7f1cc973de400 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
33c27678bf8ae8feebcbcdaa4b90a1963157b4a5 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
c0355cc0135c6af2e346b4715659353a31723b86 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
a17e8532c44aadf84b8da3a57bcc797a848b5020 
  core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 
95303e098d40cd790fb370e9b5a47d20860a6da3 
  core/src/test/scala/unit/kafka/integration/FetcherTest.scala 
25845abbcad2e79f56f729e59239b738d3ddbc9d 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
aeb7a19acaefabcc161c2ee6144a56d9a8999a81 
  core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 
eab4b5f619015af42e4554660eafb5208e72ea33 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
35dc071b1056e775326981573c9618d8046e601d 
  core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
ba3bcdcd1de9843e75e5395dff2fc31b39a5a9d5 
  
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
 d6248b09bb0f86ee7d3bd0ebce5b99135491453b 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
c2dd8eb69da8c0982a0dd20231c6f8bd58eb623e 
  core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 
4ea0489c9fd36983fe190491a086b39413f3a9cd 
  core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
3cf23b3d6d4460535b90cfb36281714788fc681c 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
1db6ac329f7b54e600802c8a623f80d159d4e69b 
  core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
ce65dab4910d9182e6774f6ef1a7f45561ec0c23 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
d60d8e0f49443f4dc8bc2cad6e2f951eda28f5cb 
  core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
f0c4a56b61b4f081cf4bee799c6e9c523ff45e19 
  core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
ad121169a5e80ebe1d311b95b219841ed69388e2 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
8913fc1d59f717c6b3ed12c8362080fb5698986b 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
a703d2715048c5602635127451593903f8d20576 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
82dce80d553957d8b5776a9e140c346d4e07f766 
  core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
c06ee756bf0fe07e5d3c92823a476c960b37afd6 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
d5d351c4f25933da0ba776a6a89a989f1ca6a902 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/serv

Re: [DISCUSS] ConfigDec Broker Changes on Trunk

2015-02-08 Thread Gwen Shapira
I think the new tickets can be done in parallel, and are not an actual
dependency for KAFKA-1845. Is that correct?

On Sat, Feb 7, 2015 at 1:44 PM, Jay Kreps  wrote:

> I don't think we need a KIP/vote here, this is just an internal
> refactoring. We had said previously and noted in the document that the KIPs
> were just for big new features or public api changes.
>
> I am a big +1 on the idea. We'll have to be careful in the code review
> since it would really easy to cause subtle issues and it is hard to review
> this kind of change.
>
> For what it is worth the high-level idea of adding a bunch of helper code
> in org.apache.kafka.common is to start to incorporate this on the server
> and replace the utilities there. This will just help reduce the total code
> size.
>
> A few of the highlights there are:
> 1. Replace kafka.utils.Utils with o.a.k.common.utils.Utils. This will
> likely involve some thought and refactoring. Anything non-general purpose
> should move out of Utils entirely and anything that remains should be high
> quality, general purpose, and have some tests. We may want to keep a
> ScalaUtils with a couple of things that aren't really doable/convenient in
> Java. This should be straight-forward.
> 2. Refactor the network server to make use of the classes in
> o.a.k.common.network (receive, send, etc). It might be doable to make use
> of Selector as well.
> 3. Replace the request classes in kafka.api with the ones in
> o.a.k.common.requests. This is one of the more valuable things we can do as
> that will get us to having a single definition of the protocol.
> 4. Make use of the exceptions in o.a.k.common.errors
> 5. Switch over to the new metrics library.
>
> I'll file tickets for these.
>
> -Jay
>
> On Fri, Feb 6, 2015 at 11:16 AM, Joe Stein  wrote:
>
> > I created KIP-12
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-12+change+broker+configuration+properties+to+be+consistent+with+the+rest+of+the+code
> > and linked it to this thread and the JIRA with the v1 patch. The rebased
> > version with updates for the current review should be ready to review in
> > the next few days.
> >
> > On Fri, Feb 6, 2015 at 1:31 PM, Jeff Holoman 
> > wrote:
> >
> > > I think this is a good change. Is there general agreement that we are
> > > moving forward with this approach? It would be nice to start using this
> > for
> > > future work.
> > >
> > > Thanks
> > >
> > > Jeff
> > >
> > > On Tue, Feb 3, 2015 at 9:34 AM, Joe Stein 
> wrote:
> > >
> > > > I updated the RB changing some of the HIGH to MEDIUM and LOW.
> > > >
> > > > There might be other or different opinions and they may change over
> > time
> > > so
> > > > I don't really see h/m/l as a blocker to the patch going in.
> > > >
> > > > It would be great to take all the rb feedback from today and then
> > > tomorrow
> > > > rebase and include changes for a new patch.
> > > >
> > > > Then over the next day or two review, test and commit to trunk (or
> > > re-work
> > > > if necessary).
> > > >
> > > > /***
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop 
> > > > /
> > > >
> > > > On Tue, Feb 3, 2015 at 4:56 AM, Andrii Biletskyi <
> > > > andrii.bilets...@stealth.ly> wrote:
> > > >
> > > > > It'd be great to have it on trunk.
> > > > > As I mentioned under jira ticket (KAFKA-1845) current
> implementation
> > > > lacks
> > > > > correct Importance settings.
> > > > > I'd be grateful if somebody could help me with it (a simple mapping
> > > > between
> > > > > config setting and importance or comments right in the review board
> > > would
> > > > > suffice).
> > > > >
> > > > > Thanks,
> > > > > Andrii Biletskyi
> > > > >
> > > > > On Mon, Feb 2, 2015 at 11:38 PM, Gwen Shapira <
> gshap...@cloudera.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Strong +1 from me (obviously). Lots of good reasons to do it:
> > > > > > consistency, code reuse, better validations, etc, etc.
> > > > > >
> > > > > > I had one comment on the patch in RB, but it can also be
> refactored
> > > as
> > > > > > follow up JIRA to avoid blocking everyone who is waiting on this.
> > > > > >
> > > > > > Gwen
> > > > > >
> > > > > > On Mon, Feb 2, 2015 at 1:31 PM, Joe Stein 
> > > > wrote:
> > > > > > > Hey, I wanted to start a quick convo around some changes on
> > trunk.
> > > > Not
> > > > > > sure
> > > > > > > this requires a KIP since it is kind of internal and shouldn't
> > > affect
> > > > > > users
> > > > > > > but we can decide if so and link this thread to that KIP if so
> > (and
> > > > > keep
> > > > > > > the discussion going on the thread if makes sense).
> > > > > > >
> > > > > > > Before making any other broker changes I wanted to see what
> folks
> > > > > thought
> > > > > > > about https://issues.apache.or

Re: JIRA attack!

2015-02-08 Thread Gwen Shapira
This was awesome :)

Peak rate of 3 per minute was reported around 3:30pm PST ;)

On Sat, Feb 7, 2015 at 4:40 PM, Jay Kreps  wrote:

> I closed about 350 redundant or obsolete issues. If I closed an issue you
> think is not obsolete, my apologies, just reopen.
>
> -Jay
>


[jira] [Updated] (KAFKA-1932) kafka topic (creation) templates

2015-02-08 Thread Ahmet AKYOL (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet AKYOL updated KAFKA-1932:
---
Description: 
AFAIK, the only way to create a Kafka topic (without using the default 
settings) is using the provided bash script.

Even though, a client support could be nice, I would prefer to see a template 
mechanism similar to [Elasticsearch Index 
Templates|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-templates.html]
 . 

What I have in my mind is very simple and adding something like this into 
server properties :

template.name=pattern,numOfReplica,NumberOfPartition

and pattern can only contain "*" meaning starts with, ends with or contains.

example:
template.logtopics=*_log,2,20
template.loaders=*_loader,1,5

so,when some producer sends a message to a topic for the first time which ends 
with "_logs" , then, kafka can use above settings.

thanks in advance

update:

On second thought, maybe a command like kafka-create-template.sh could be more 
practical for cluster deployments, rather than adding to server.properties. 
Kafka internally registers this to ZK.

About use cases, I can understand an opposing argument like creating many 
topics is not a good design decision. Besides, my point is not to create so 
many topics, just to automate an important process by giving the responsibility 
to Kafka.

  was:
AFAIK, the only way to create a Kafka topic (without using the default 
settings) is using the provided bash script.

Even though, a client support could be nice, I would prefer to see a template 
mechanism similar to [Elasticsearch Index 
Templates|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-templates.html]
 . 

What I have in my mind is very simple and adding something like this into 
server properties :

template.name=pattern,numOfReplica,NumberOfPartition

and pattern can only contain "*" meaning starts with, ends with or contains.

example:
template.logtopics=*_log,2,20
template.loaders=*_loader,1,5

so when the first time some producer sends a message to a topic that ends with 
"_logs" , kafka can use above settings.

thanks in advance


> kafka topic (creation) templates
> 
>
> Key: KAFKA-1932
> URL: https://issues.apache.org/jira/browse/KAFKA-1932
> Project: Kafka
>  Issue Type: Wish
>Reporter: Ahmet AKYOL
>
> AFAIK, the only way to create a Kafka topic (without using the default 
> settings) is using the provided bash script.
> Even though, a client support could be nice, I would prefer to see a template 
> mechanism similar to [Elasticsearch Index 
> Templates|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-templates.html]
>  . 
> What I have in my mind is very simple and adding something like this into 
> server properties :
> template.name=pattern,numOfReplica,NumberOfPartition
> and pattern can only contain "*" meaning starts with, ends with or contains.
> example:
> template.logtopics=*_log,2,20
> template.loaders=*_loader,1,5
> so,when some producer sends a message to a topic for the first time which 
> ends with "_logs" , then, kafka can use above settings.
> thanks in advance
> update:
> On second thought, maybe a command like kafka-create-template.sh could be 
> more practical for cluster deployments, rather than adding to 
> server.properties. Kafka internally registers this to ZK.
> About use cases, I can understand an opposing argument like creating many 
> topics is not a good design decision. Besides, my point is not to create so 
> many topics, just to automate an important process by giving the 
> responsibility to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1831) Producer does not provide any information about which host the data was sent to

2015-02-08 Thread Mark Payne (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311314#comment-14311314
 ] 

Mark Payne commented on KAFKA-1831:
---

[~jkreps]: that sounds perfect! I appreciate you guys following up.

> Producer does not provide any information about which host the data was sent 
> to
> ---
>
> Key: KAFKA-1831
> URL: https://issues.apache.org/jira/browse/KAFKA-1831
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.1.1
>Reporter: Mark Payne
>Assignee: Jun Rao
>
> For traceability purposes and for troubleshooting, when sending data to 
> Kafka, the Producer should provide information about which host the data was 
> sent to. This works well already in the SimpleConsumer, which provides host() 
> and port() methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1877) Expose version via JMX for 'new' producer

2015-02-08 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311274#comment-14311274
 ] 

Manikumar Reddy commented on KAFKA-1877:


[~jkreps] [~junrao]  Currently Kafka metrics can return only double value 
(Metric.java). 
How to handle future metrics which can return other data types (int, long, 
string).?
We may need to introduce generic types to metrics.  This will be a sizable 
change and will change some public APIs.

I encountered this, while I was trying to include version info to producer 
metrics. 

> Expose version via JMX for 'new' producer 
> --
>
> Key: KAFKA-1877
> URL: https://issues.apache.org/jira/browse/KAFKA-1877
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.8.2
>Reporter: Vladimir Tretyakov
>Assignee: Manikumar Reddy
> Fix For: 0.8.3
>
>
> Add version of Kafka to jmx (monitoring tool can use this info).
> Something like that
> {code}
> kafka.common:type=AppInfo,name=Version
>   Value java.lang.Object = 0.8.2-beta
> {code}
> we already have this in "core" Kafka module (see kafka.common.AppInfo object).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 30482: Add the coordinator to server

2015-02-08 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/
---

(Updated Feb. 6, 2015, 11:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1333
https://issues.apache.org/jira/browse/KAFKA-1333


Repository: kafka


Description (updated)
---

dummy 2


minor


three new file


Address Jay and Onur's comments


Diffs (updated)
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
  core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
  core/src/main/scala/kafka/network/SocketServer.scala 
39b1651b680b2995cedfde95d74c086d9c6219ef 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala 
fb7e9ed5c16dd15b71e1b1ac12948641185871db 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/MetadataCache.scala 
bf81a1ab88c14be8697b441eedbeb28fa0112643 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
  core/src/test/resources/log4j.properties 
1b7d5d8f7d5fae7d272849715714781cad05d77b 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
54755e8dd3f23ced313067566cd4ea867f8a496e 

Diff: https://reviews.apache.org/r/30482/diff/


Testing
---


Thanks,

Guozhang Wang



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-08 Thread Jiangjie Qin
I¹m a little bit concerned about the request routers among brokers.
Typically we have a dominant percentage of produce and fetch
request/response. Routing them from one broker to another seems not wanted.
Also I think we generally have two types of requests/responses: data
related and admin related. It is typically a good practice to separate
data plain from control plain. That suggests we should have another admin
port to serve those admin requests and probably have different
authentication/authorization from the data port.

Jiangjie (Becket) Qin

On 2/6/15, 11:18 AM, "Joe Stein"  wrote:

>I updated the installation and sample usage for the existing patches on
>the
>KIP site
>https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and
>+centralized+administrative+operations
>
>There are still a few pending items here.
>
>1) There was already some discussion about using the Broker that is the
>Controller here https://issues.apache.org/jira/browse/KAFKA-1772 and we
>should elaborate on that more in the thread or agree we are ok with admin
>asking for the controller to talk to and then just sending that broker the
>admin tasks.
>
>2) I like this idea https://issues.apache.org/jira/browse/KAFKA-1912 but
>we
>can refactor after KAFK-1694 committed, no? I know folks just want to talk
>to the broker that is the controller. It may even become useful to have
>the
>controller run on a broker that isn't even a topic broker anymore (small
>can of worms I am opening here but it elaborates on Guozhang's hot spot
>point.
>
>3) anymore feedback?
>
>- Joe Stein
>
>On Fri, Jan 23, 2015 at 3:15 PM, Guozhang Wang  wrote:
>
>> A centralized admin operation protocol would be very useful.
>>
>> One more general comment here is that controller is originally designed
>>to
>> only talk to other brokers through ControllerChannel, while the broker
>> instance which carries the current controller is agnostic of its
>>existence,
>> and use KafkaApis to handle general Kafka requests. Having all admin
>> requests redirected to the controller instance will force the broker to
>>be
>> aware of its carried controller, and access its internal data for
>>handling
>> these requests. Plus with the number of clients out of Kafka's control,
>> this may easily cause the controller to be a hot spot in terms of
>>request
>> load.
>>
>>
>> On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein 
>>wrote:
>>
>> > inline
>> >
>> > On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps 
>>wrote:
>> >
>> > > Hey Joe,
>> > >
>> > > This is great. A few comments on KIP-4
>> > >
>> > > 1. This is much needed functionality, but there are a lot of the so
>> let's
>> > > really think these protocols through. We really want to end up with
>>a
>> set
>> > > of well thought-out, orthoganol apis. For this reason I think it is
>> > really
>> > > important to think through the end state even if that includes APIs
>>we
>> > > won't implement in the first phase.
>> > >
>> >
>> > ok
>> >
>> >
>> > >
>> > > 2. Let's please please please wait until we have switched the server
>> over
>> > > to the new java protocol definitions. If we add upteen more ad hoc
>> scala
>> > > objects that is just generating more work for the conversion we
>>know we
>> > > have to do.
>> > >
>> >
>> > ok :)
>> >
>> >
>> > >
>> > > 3. This proposal introduces a new type of optional parameter. This
>>is
>> > > inconsistent with everything else in the protocol where we use -1 or
>> some
>> > > other marker value. You could argue either way but let's stick with
>> that
>> > > for consistency. For clients that implemented the protocol in a
>>better
>> > way
>> > > than our scala code these basic primitives are hard to change.
>> > >
>> >
>> > yes, less confusing, ok.
>> >
>> >
>> > >
>> > > 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest
>>which
>> > has
>> > > brokers, topics, and partitions. I think we should rename that
>>request
>> > > ClusterMetadataRequest (or just MetadataRequest) and include the id
>>of
>> > the
>> > > controller. Or are there other things we could add here?
>> > >
>> >
>> > We could add broker version to it.
>> >
>> >
>> > >
>> > > 5. We have a tendency to try to make a lot of requests that can
>>only go
>> > to
>> > > particular nodes. This adds a lot of burden for client
>>implementations
>> > (it
>> > > sounds easy but each discovery can fail in many parts so it ends up
>> > being a
>> > > full state machine to do right). I think we should consider making
>> admin
>> > > commands and ideally as many of the other apis as possible
>>available on
>> > all
>> > > brokers and just redirect to the controller on the broker side.
>>Perhaps
>> > > there would be a general way to encapsulate this re-routing
>>behavior.
>> > >
>> >
>> > If we do that then we should also preserve what we have and do both.
>>The
>> > client can then decide "do I want to go to any broker and proxy" or
>>just
>> > "go to controller and run admin task". Lots of folks have seen
>> controllers

Re: [DISCUSS] KIPs

2015-02-08 Thread Jay Kreps
A problem I am having is actually understanding which KIPs are intended to
be complete proposals and which are works in progress. Joe you seem to have
a bunch of these. Can you move them elsewhere until they are really fully
done and ready for review and discussion?

-Jay

On Fri, Feb 6, 2015 at 12:09 PM, Jay Kreps  wrote:

> I think we are focused on making committing new changes easier, but what
> we have seen is actually that isn't the bulk of the work (especially with
> this kind of "public interface" change where it generally has a big user
> impact). I think we actually really need the core committers and any other
> interested parties to stop and fully read each KIP and think about it. If
> we don't have time to do that we usually just end up spending a lot more
> time after the fact trying to rework things latter when it is a lot harder.
> So I really think we should have every active committer read, comment, and
> vote on each KIP. I think this may require a little bit of work to
> co-ordinate/bug people but will end up being worth it because each person
> on the project will have a holistic picture of what is going on.
>
> -Jay
>
> On Thu, Feb 5, 2015 at 11:24 PM, Joel Koshy  wrote:
>
>> Just wanted to add a few more comments on this: KIPs were suggested as
>> a process to help reach early consensus on a major change or not so
>> major (but tricky or backward incompatible) change in order to reduce
>> the likelihood of multiple iterations and complete rewrites during
>> code reviews (which is time-intensive for both the contributor and
>> reviewers); as well as to reduce the likelihood of surprises (say, if
>> a patch inadvertently changes a public API).  So KIPs are intended to
>> speed up development since a clear path is charted out and there is
>> prior consensus on whether a feature and its design/implementation
>> make sense or not.
>>
>> Obviously this breaks down if KIPs are not being actively discussed -
>> again I think we can do much better here. I think we ended up with a
>> backlog because as soon as the KIP wiki was started, a number of
>> pre-existing jiras and discussions were moved there - all within a few
>> days. Now that there are quite a few outstanding KIPs I think we just
>> need to methodically work through those - preferably a couple at a
>> time. I looked through the list and I think we should be able to
>> resolve all of them relatively quickly if everyone is on board with
>> this.
>>
>> > > Its probably more helpful for contributors if its "lazy" as in "no
>> > > strong objections" .
>>
>> Gwen also suggested this and this also sounds ok to me as I wrote
>> earlier - what do others think? This is important especially if
>> majority in the community think if this less restrictive policy would
>> spur and not hinder development - I'm not sure that it does. I
>> completely agree that KIPs fail to a large degree as far as the
>> original motivation goes if they require a lazy majority but the
>> DISCUSS threads are stalled. IOW regardless of that discussion, I
>> think we should rejuvenate some of those threads especially now that
>> 0.8.2 is out of the way.
>>
>> Thanks,
>>
>> Joel
>>
>> On Thu, Feb 05, 2015 at 08:56:13PM -0800, Joel Koshy wrote:
>> > I'm just thinking aloud - I don't know what a good number would be, and
>> it
>> > is just one possibility to streamline how KIPs are processed. It largely
>> > depends on how complex the proposals are. What would be concerning is if
>> > there are 10 different threads all dealing with large KIPs and no one
>> has
>> > the time to give due diligence to each one and all those threads grind
>> to a
>> > halt due to confusion, incomplete context and misunderstandings.
>> >
>> > On Thursday, February 5, 2015, Harsha  wrote:
>> >
>> > > Joel,
>> > >Having only 2 or 3 KIPS under active discussion is concerning.
>> > >This will slow down development process as well.
>> > > Having a turn-around time for a KIP is a good idea but what will
>> happen
>> > > if it didn't received required votes within that time frame.
>> > > Its probably more helpful for contributors if its "lazy" as in "no
>> > > strong objections" .
>> > > Just to make sure this is only for KIPs not for regular bug fixes
>> right?
>> > > Thanks,
>> > > Harsha
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Feb 5, 2015, at 05:59 PM, Jiangjie Qin wrote:
>> > > > Iąm having an impression that KIP is mostly for new features but
>> not for
>> > > > bug fixes. But I agree with Joel that it might make sense to have
>> some
>> > > > big
>> > > > patches, even if they are bug fixes, to follow the KIP like process
>> which
>> > > > is more strict.
>> > > >
>> > > > Jiangjie (Becket) Qin
>> > > >
>> > > > On 2/5/15, 4:57 PM, "Gwen Shapira" > >
>> > > wrote:
>> > > >
>> > > > >>
>> > > > >>
>> > > > >> Yes there are KIPs that are currently blocked on feedback/votes,
>> but I
>> > > > >> don't think it is an issue of not caring to comment vs having so
>> many
>> 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-08 Thread Jay Kreps
Hey Joe,

I think this is proposing several things:
1. A new command line utility. This isn't really fully specified here.
There is sample usage but I actually don't really understand what all the
commands will be. Also, presumably this will replace the existing shell
scripts, right? We obviously don't want to be in a state where we have
both...
2. A new set of language agnostic administrative protocols.
3. A new Java API for issuing administrative requests using the protocol. I
don't see any discussion on what this will look like.

It might be easiest to tackle these one at a time, no? If not we really do
need to get a complete description at each layer as these are pretty core
public apis.

-Jay

On Fri, Feb 6, 2015 at 11:18 AM, Joe Stein  wrote:

> I updated the installation and sample usage for the existing patches on the
> KIP site
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations
>
> There are still a few pending items here.
>
> 1) There was already some discussion about using the Broker that is the
> Controller here https://issues.apache.org/jira/browse/KAFKA-1772 and we
> should elaborate on that more in the thread or agree we are ok with admin
> asking for the controller to talk to and then just sending that broker the
> admin tasks.
>
> 2) I like this idea https://issues.apache.org/jira/browse/KAFKA-1912 but
> we
> can refactor after KAFK-1694 committed, no? I know folks just want to talk
> to the broker that is the controller. It may even become useful to have the
> controller run on a broker that isn't even a topic broker anymore (small
> can of worms I am opening here but it elaborates on Guozhang's hot spot
> point.
>
> 3) anymore feedback?
>
> - Joe Stein
>
> On Fri, Jan 23, 2015 at 3:15 PM, Guozhang Wang  wrote:
>
> > A centralized admin operation protocol would be very useful.
> >
> > One more general comment here is that controller is originally designed
> to
> > only talk to other brokers through ControllerChannel, while the broker
> > instance which carries the current controller is agnostic of its
> existence,
> > and use KafkaApis to handle general Kafka requests. Having all admin
> > requests redirected to the controller instance will force the broker to
> be
> > aware of its carried controller, and access its internal data for
> handling
> > these requests. Plus with the number of clients out of Kafka's control,
> > this may easily cause the controller to be a hot spot in terms of request
> > load.
> >
> >
> > On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein 
> wrote:
> >
> > > inline
> > >
> > > On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps 
> wrote:
> > >
> > > > Hey Joe,
> > > >
> > > > This is great. A few comments on KIP-4
> > > >
> > > > 1. This is much needed functionality, but there are a lot of the so
> > let's
> > > > really think these protocols through. We really want to end up with a
> > set
> > > > of well thought-out, orthoganol apis. For this reason I think it is
> > > really
> > > > important to think through the end state even if that includes APIs
> we
> > > > won't implement in the first phase.
> > > >
> > >
> > > ok
> > >
> > >
> > > >
> > > > 2. Let's please please please wait until we have switched the server
> > over
> > > > to the new java protocol definitions. If we add upteen more ad hoc
> > scala
> > > > objects that is just generating more work for the conversion we know
> we
> > > > have to do.
> > > >
> > >
> > > ok :)
> > >
> > >
> > > >
> > > > 3. This proposal introduces a new type of optional parameter. This is
> > > > inconsistent with everything else in the protocol where we use -1 or
> > some
> > > > other marker value. You could argue either way but let's stick with
> > that
> > > > for consistency. For clients that implemented the protocol in a
> better
> > > way
> > > > than our scala code these basic primitives are hard to change.
> > > >
> > >
> > > yes, less confusing, ok.
> > >
> > >
> > > >
> > > > 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest
> which
> > > has
> > > > brokers, topics, and partitions. I think we should rename that
> request
> > > > ClusterMetadataRequest (or just MetadataRequest) and include the id
> of
> > > the
> > > > controller. Or are there other things we could add here?
> > > >
> > >
> > > We could add broker version to it.
> > >
> > >
> > > >
> > > > 5. We have a tendency to try to make a lot of requests that can only
> go
> > > to
> > > > particular nodes. This adds a lot of burden for client
> implementations
> > > (it
> > > > sounds easy but each discovery can fail in many parts so it ends up
> > > being a
> > > > full state machine to do right). I think we should consider making
> > admin
> > > > commands and ideally as many of the other apis as possible available
> on
> > > all
> > > > brokers and just redirect to the controller on the broker side.
> Perhaps
> > > > there would be a general way to encapsulate this re-routing behav

Re: Review Request 29831: Patch for KAFKA-1476

2015-02-08 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29831/#review71555
---

Ship it!


Thanks for attaching the latest run of the tool. I observed that we should get 
rid of zookeeper error message from 
Could not fetch offset from zookeeper for group g1 partition [t1,0] due to 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/g1/offsets/t1/0.

It is somewhat unreadable and not very useful for the user. The best way to 
handle this in a user facing tool is to explain the cause, rather than use the 
error message directly.

Other than that, the patch looks good. Once you fix the above problem, I will 
check it in.

- Neha Narkhede


On Feb. 5, 2015, 11:01 a.m., Onur Karaman wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29831/
> ---
> 
> (Updated Feb. 5, 2015, 11:01 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1476
> https://issues.apache.org/jira/browse/KAFKA-1476
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Merged in work for KAFKA-1476 and sub-task KAFKA-1826
> 
> 
> Diffs
> -
> 
>   bin/kafka-consumer-groups.sh PRE-CREATION 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> 28b12c7b89a56c113b665fbde1b95f873f8624a3 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> c14bd455b6642f5e6eb254670bef9f57ae41d6cb 
>   core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> 33c27678bf8ae8feebcbcdaa4b90a1963157b4a5 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/29831/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Onur Karaman
> 
>



Re: JIRA attack!

2015-02-08 Thread Pradeep Gollakota
Apparently I joined this list at the right time :P

On Sat, Feb 7, 2015 at 4:40 PM, Jay Kreps  wrote:

> I closed about 350 redundant or obsolete issues. If I closed an issue you
> think is not obsolete, my apologies, just reopen.
>
> -Jay
>


Re: Review Request 30403: Patch for KAFKA-1906

2015-02-08 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30403/#review71557
---



config/server.properties


Why leave the value here to point at /tmp/kafka-logs? The way I see is that 
we default the log directory to data/ in the kafka installation directory and 
possibly encourage storing kafka data under var/ if it must be overridden for 
production, by changing this commented out value to /var/kafka-logs?



core/src/main/scala/kafka/server/KafkaConfig.scala


If KAFKA_HOME is not set (since somehow they are starting the kafka server 
using different scripts), then we should just point the log directory to the 
value of log.dirs and not force a default to /tmp.


- Neha Narkhede


On Jan. 29, 2015, 6:24 a.m., Jaikiran Pai wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30403/
> ---
> 
> (Updated Jan. 29, 2015, 6:24 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1906
> https://issues.apache.org/jira/browse/KAFKA-1906
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1906 Default the Kafka data log directory to 
> $KAFKA_HOME/data/kafka-logs directory, where KAFKA_HOME is the Kafka 
> installation directory
> 
> 
> Diffs
> -
> 
>   bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
>   bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
>   config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 6d74983472249eac808d361344c58cc2858ec971 
>   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
> 82dce80d553957d8b5776a9e140c346d4e07f766 
> 
> Diff: https://reviews.apache.org/r/30403/diff/
> 
> 
> Testing
> ---
> 
> The change here involves updating the Kafka scripts (for Windows and * nix) 
> to infer and setup KAFKA_HOME environment variable. This value is then used 
> by the KafkaConfig to decide what path to default to for the Kafka data logs, 
> in the absence of any explicitly set log.dirs (or log.dir) properties.
> 
> Care has been taken to ensure that other mechanism which might presently be 
> bypassing the Kafka scripts, will still continue to function, since in the 
> absence of KAFKA_HOME environment property value, we fall back to 
> /tmp/kafka-logs (the present default) as the default data log directory
> 
> Existing tests have been run to ensure that this change maintains backward 
> compatibility (i.e. doesn't fail when KAFKA_HOME isn't available/set) and 2 
> new test methods have been added to the KafkaConfigTest to ensure that this 
> change works.
> 
> Although the change has been made to both .sh and .bat files, to support 
> this, I haven't actually tested this change on a Windows OS and would 
> appreciate if someone can test this there and let me know if they run into 
> any issues.
> 
> 
> Thanks,
> 
> Jaikiran Pai
> 
>



Re: [DISCUSS] ConfigDec Broker Changes on Trunk

2015-02-08 Thread Jay Kreps
I don't think we need a KIP/vote here, this is just an internal
refactoring. We had said previously and noted in the document that the KIPs
were just for big new features or public api changes.

I am a big +1 on the idea. We'll have to be careful in the code review
since it would really easy to cause subtle issues and it is hard to review
this kind of change.

For what it is worth the high-level idea of adding a bunch of helper code
in org.apache.kafka.common is to start to incorporate this on the server
and replace the utilities there. This will just help reduce the total code
size.

A few of the highlights there are:
1. Replace kafka.utils.Utils with o.a.k.common.utils.Utils. This will
likely involve some thought and refactoring. Anything non-general purpose
should move out of Utils entirely and anything that remains should be high
quality, general purpose, and have some tests. We may want to keep a
ScalaUtils with a couple of things that aren't really doable/convenient in
Java. This should be straight-forward.
2. Refactor the network server to make use of the classes in
o.a.k.common.network (receive, send, etc). It might be doable to make use
of Selector as well.
3. Replace the request classes in kafka.api with the ones in
o.a.k.common.requests. This is one of the more valuable things we can do as
that will get us to having a single definition of the protocol.
4. Make use of the exceptions in o.a.k.common.errors
5. Switch over to the new metrics library.

I'll file tickets for these.

-Jay

On Fri, Feb 6, 2015 at 11:16 AM, Joe Stein  wrote:

> I created KIP-12
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-12+change+broker+configuration+properties+to+be+consistent+with+the+rest+of+the+code
> and linked it to this thread and the JIRA with the v1 patch. The rebased
> version with updates for the current review should be ready to review in
> the next few days.
>
> On Fri, Feb 6, 2015 at 1:31 PM, Jeff Holoman 
> wrote:
>
> > I think this is a good change. Is there general agreement that we are
> > moving forward with this approach? It would be nice to start using this
> for
> > future work.
> >
> > Thanks
> >
> > Jeff
> >
> > On Tue, Feb 3, 2015 at 9:34 AM, Joe Stein  wrote:
> >
> > > I updated the RB changing some of the HIGH to MEDIUM and LOW.
> > >
> > > There might be other or different opinions and they may change over
> time
> > so
> > > I don't really see h/m/l as a blocker to the patch going in.
> > >
> > > It would be great to take all the rb feedback from today and then
> > tomorrow
> > > rebase and include changes for a new patch.
> > >
> > > Then over the next day or two review, test and commit to trunk (or
> > re-work
> > > if necessary).
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop 
> > > /
> > >
> > > On Tue, Feb 3, 2015 at 4:56 AM, Andrii Biletskyi <
> > > andrii.bilets...@stealth.ly> wrote:
> > >
> > > > It'd be great to have it on trunk.
> > > > As I mentioned under jira ticket (KAFKA-1845) current implementation
> > > lacks
> > > > correct Importance settings.
> > > > I'd be grateful if somebody could help me with it (a simple mapping
> > > between
> > > > config setting and importance or comments right in the review board
> > would
> > > > suffice).
> > > >
> > > > Thanks,
> > > > Andrii Biletskyi
> > > >
> > > > On Mon, Feb 2, 2015 at 11:38 PM, Gwen Shapira  >
> > > > wrote:
> > > >
> > > > > Strong +1 from me (obviously). Lots of good reasons to do it:
> > > > > consistency, code reuse, better validations, etc, etc.
> > > > >
> > > > > I had one comment on the patch in RB, but it can also be refactored
> > as
> > > > > follow up JIRA to avoid blocking everyone who is waiting on this.
> > > > >
> > > > > Gwen
> > > > >
> > > > > On Mon, Feb 2, 2015 at 1:31 PM, Joe Stein 
> > > wrote:
> > > > > > Hey, I wanted to start a quick convo around some changes on
> trunk.
> > > Not
> > > > > sure
> > > > > > this requires a KIP since it is kind of internal and shouldn't
> > affect
> > > > > users
> > > > > > but we can decide if so and link this thread to that KIP if so
> (and
> > > > keep
> > > > > > the discussion going on the thread if makes sense).
> > > > > >
> > > > > > Before making any other broker changes I wanted to see what folks
> > > > thought
> > > > > > about https://issues.apache.org/jira/browse/KAFKA-1845 ConfigDec
> > > > patch.
> > > > > >
> > > > > > I agree it will be nice to standardize and use one configuration
> > and
> > > > > > validation library across the board. It helps in a lot of
> different
> > > > > changes
> > > > > > we have been discussing also in 0.8.3 and think we should make
> sure
> > > it
> > > > is
> > > > > > what we want if so then: review, commit and keep going.
> > > > > >
> > > > > > Though

[jira] [Commented] (KAFKA-1884) New Producer blocks forever for Invalid topic names

2015-02-08 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311252#comment-14311252
 ] 

Manikumar Reddy commented on KAFKA-1884:


[~guozhang] [~jkreps] 
KAFKA-1919 solves the retry problem.  Currently we will get continuous empty 
metadata response for invalid topics.  I was thinking,  Can clients get the 
InvalidTopicException/Error code?  or Can we add topic validation at client 
side itself? How non-java clients will handle it?

> New Producer blocks forever for Invalid topic names
> ---
>
> Key: KAFKA-1884
> URL: https://issues.apache.org/jira/browse/KAFKA-1884
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Manikumar Reddy
> Fix For: 0.8.3
>
>
> New producer blocks forever for invalid topics names
> producer logs:
> DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,406] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50845,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,416] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50845.
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50846,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,417] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50846.
> DEBUG [2015-01-20 12:46:13,417] NetworkClient: maybeUpdateMetadata(): Trying 
> to send metadata request to node -1
> DEBUG [2015-01-20 12:46:13,418] NetworkClient: maybeUpdateMetadata(): Sending 
> metadata request ClientRequest(expectResponse=true, payload=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=50847,client_id=my-producer},
>  body={topics=[TOPIC=]})) to node -1
> TRACE [2015-01-20 12:46:13,418] NetworkClient: handleMetadataResponse(): 
> Ignoring empty metadata response with correlation id 50847.
> Broker logs:
> [2015-01-20 12:46:14,074] ERROR [KafkaApi-0] error when handling request 
> Name: TopicMetadataRequest; Version: 0; CorrelationId: 51020; ClientId: 
> my-producer; Topics: TOPIC= (kafka.server.KafkaApis)
> kafka.common.InvalidTopicException: topic name TOPIC= is illegal, contains a 
> character other than ASCII alphanumerics, '.', '_' and '-'
>   at kafka.common.Topic$.validate(Topic.scala:42)
>   at 
> kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:186)
>   at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:177)
>   at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:367)
>   at kafka.server.KafkaApis$$anonfun$5.apply(KafkaApis.scala:350)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at 
> scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
>   at scala.collection.SetLike$class.map(SetLike.scala:93)
>   at scala.collection.AbstractSet.map(Set.scala:47)
>   at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:350)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:389)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:57)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>   at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-08 Thread Neha Narkhede
Hey Becket,

What are the next steps on this KIP. As per your comment earlier on the
thread -

I do agree it makes more sense
> to avoid duplicate effort and plan based on new consumer. I’ll modify the
> KIP.


Did you get a chance to think about the simplified design that we proposed
earlier? Do you plan to update the KIP with that proposal?

Thanks,
Neha

On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin 
wrote:

> In mirror maker we do not do de-serialization on the messages. Mirror
> maker use source TopicPartition hash to chose a producer to send messages
> from the same source partition. The partition those messages end up with
> are decided by Partitioner class in KafkaProducer (assuming you are using
> the new producer), which uses hash code of bytes[].
>
> If deserialization is needed, it has to be done in message handler.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 2/4/15, 11:33 AM, "Bhavesh Mistry"  wrote:
>
> >Hi Jiangjie,
> >
> >Thanks for entertaining my question so far.  Last question, I have is
> >about
> >serialization of message key.  If the key de-serialization (Class) is not
> >present at the MM instance, then does it use raw byte hashcode to
> >determine
> >the partition ?  How are you going to address the situation where key
> >needs
> >to be de-serialization and get actual hashcode needs to be computed  ?.
> >
> >
> >Thanks,
> >
> >Bhavesh
> >
> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin 
> >wrote:
> >
> >> Hi Bhavesh,
> >>
> >> Please see inline comments.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" 
> >>wrote:
> >>
> >> >Hi Jiangjie,
> >> >
> >> >Thanks for the input.
> >> >
> >> >a) Is MM will  producer ack will be attach to Producer Instance or per
> >> >topic.  Use case is that one instance of MM
> >> >needs to handle both strong ack and also ack=0 for some topic.  Or it
> >> >would
> >> >be better to set-up another instance of MM.
> >> The acks setting is producer level setting instead of topic level
> >>setting.
> >> In this case you probably need to set up another instance.
> >> >
> >> >b) Regarding TCP connections, Why does #producer instance attach to TCP
> >> >connection.  Is it possible to use Broker Connection TCP Pool, producer
> >> >will just checkout TCP connection  to Broker.  So, # of Producer
> >>Instance
> >> >does not correlation to Brokers Connection.  Is this possible ?
> >> In new producer, each producer maintains a connection to each broker
> >> within the producer instance. Making producer instances to share the TCP
> >> connections is a very big change to the current design, so I suppose we
> >> won’t be able to do that.
> >> >
> >> >
> >> >Thanks,
> >> >
> >> >Bhavesh
> >> >
> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin
> >> >> >
> >> >wrote:
> >> >
> >> >> Hi Bhavesh,
> >> >>
> >> >> I think it is the right discussion to have when we are talking about
> >>the
> >> >> new new design for MM.
> >> >> Please see the inline comments.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" 
> >> >>wrote:
> >> >>
> >> >> >Hi Jiangjie,
> >> >> >
> >> >> >I just wanted to let you know about our use case and stress the
> >>point
> >> >>that
> >> >> >local data center broker cluster have fewer partitions than the
> >> >> >destination
> >> >> >offline broker cluster. Just because we do the batch pull from CAMUS
> >> >>and
> >> >> >in
> >> >> >order to drain data faster than the injection rate (from four DCs
> >>for
> >> >>same
> >> >> >topic).
> >> >> Keeping the same partition number in source and target cluster will
> >>be
> >> >>an
> >> >> option but will not be enforced by default.
> >> >> >
> >> >> >We are facing following issues (probably due to configuration):
> >> >> >
> >> >> >1)  We occasionally loose data due to message batch size is too
> >> >>large
> >> >> >(2MB) on target data (we are using old producer but I think new
> >> >>producer
> >> >> >will solve this problem to some extend).
> >> >> We do see this issue in LinkedIn as well. New producer also might
> >>have
> >> >> this issue. There are some proposal of solutions, but no real work
> >> >>started
> >> >> yet. For now, as a workaround, setting a more aggressive batch size
> >>on
> >> >> producer side should work.
> >> >> >2)  Since only one instance is set to MM data,  we are not able
> >>to
> >> >> >set-up ack per topic instead ack is attached to producer instance.
> >> >> I don’t quite get the question here.
> >> >> >3)  How are you going to address two phase commit problem if
> >>ack is
> >> >> >set
> >> >> >to strongest, but auto commit is on for consumer (meaning producer
> >>does
> >> >> >not
> >> >> >get ack,  but consumer auto committed offset that message).  Is
> >>there
> >> >> >transactional (Kafka transaction is in process) based ack and commit
> >> >> >offset
> >> >> >?
> >> >> Auto offset commit should be turned off in this case. The offset will
> >> >>only
> >> >> be committed once by the off

Re: Review Request 30482: Add the coordinator to server

2015-02-08 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review71562
---

Ship it!


Ship It!

- Jay Kreps


On Feb. 6, 2015, 11:02 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> ---
> 
> (Updated Feb. 6, 2015, 11:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> dummy 2
> 
> 
> minor
> 
> 
> three new file
> 
> 
> Address Jay and Onur's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 39b1651b680b2995cedfde95d74c086d9c6219ef 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
> fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 89200da30a04943f0b9befe84ab17e62b747c8c4 
>   core/src/main/scala/kafka/server/MetadataCache.scala 
> bf81a1ab88c14be8697b441eedbeb28fa0112643 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 0bdd42fea931cddd072c0fff765b10526db6840a 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 81ae205ef7b2050d0152f29f8da7dd91b17b8b00 
>   core/src/test/resources/log4j.properties 
> 1b7d5d8f7d5fae7d272849715714781cad05d77b 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 90c0b7a19c7af8e5416e4bdba62b9824f1abd5ab 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 30482: Add the coordinator to server

2015-02-08 Thread Guozhang Wang


On Feb. 1, 2015, 8:46 p.m., Guozhang Wang wrote:
> > Not sure this stuff is actually here for review...may still be a work in 
> > progress. Overall this structure of code makes a ton of sense to me. Left 
> > some minor comments.

Yes this is more of a WIP patch, but the scope of this JIRA does not include a 
fully implemented failure detection / rebalance logic either. As it just addes 
the coordinator module with simple start-up / shut-down functions, which can 
unblock further development to be parallelized.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30482/#review70533
---


On Feb. 1, 2015, 2:45 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30482/
> ---
> 
> (Updated Feb. 1, 2015, 2:45 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1333
> https://issues.apache.org/jira/browse/KAFKA-1333
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 1. Add ConsumerCoordinator with GroupRegistry and ConsumerRegistry metadata, 
> and ZK listeners.
> 2. Add a delayed heartbeat purgatory based on HeartbeatBucket to expire 
> heartbeat requests.
> 3. Add a delayed rebalance purgatory for preparing rebalance.
> 4. Add a join-group purgatory for sending back responses with assigned 
> partitions.
> 5. Add TimeMsKey / ConsumerKey and ConsumerGroupKey for delayed heartbeat / 
> join-group / rebalance purgatories.
> 6. Refactor KafkaApis for handling JoinGroup / Heartbeat requests with 
> coordinator, and sending reponses via callbacks.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
> fb7e9ed5c16dd15b71e1b1ac12948641185871db 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> f2b027bf944e735fd52cc282690ec1b8395f9290 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> 89200da30a04943f0b9befe84ab17e62b747c8c4 
> 
> Diff: https://reviews.apache.org/r/30482/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 30196: Patch for KAFKA-1886

2015-02-08 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30196/#review71556
---



core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala


Why do you need the sleep here? We try to avoid blindly sleeping in Kafka 
tests since it almost always leads to transient test failures. Consider using 
TestUtils.waitUntilTrue().


- Neha Narkhede


On Feb. 2, 2015, 9:57 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30196/
> ---
> 
> (Updated Feb. 2, 2015, 9:57 p.m.)
> 
> 
> Review request for kafka and Joel Koshy.
> 
> 
> Bugs: KAFKA-1886
> https://issues.apache.org/jira/browse/KAFKA-1886
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fixing KAFKA-1886. SimpleConsumer should not swallow 
> ClosedByInterruptException
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> cbef84ac76e62768981f74e71d451f2bda995275 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> aeb7a19acaefabcc161c2ee6144a56d9a8999a81 
> 
> Diff: https://reviews.apache.org/r/30196/diff/
> 
> 
> Testing
> ---
> 
> Added an integration test to PrimitiveAPITest.scala.
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Re: Review Request 29468: Patch for KAFKA-1805

2015-02-08 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/#review71572
---



clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java


Is this intentional?



clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java


Theoretically partitionId can be null also.


- Guozhang Wang


On Dec. 30, 2014, 12:37 a.m., Parth Brahmbhatt wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29468/
> ---
> 
> (Updated Dec. 30, 2014, 12:37 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
> https://issues.apache.org/jira/browse/KAFKA-1805
> https://issues.apache.org/jira/browse/KAFKA-1905
> https://issues.apache.org/jira/browse/KAFKA-42
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1805: adding equals and hashcode methods to ProducerRecord.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
> 065d4e6c6a4966ac216e98696782e2714044df29 
> 
> Diff: https://reviews.apache.org/r/29468/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>



JIRA attack!

2015-02-08 Thread Jay Kreps
I closed about 350 redundant or obsolete issues. If I closed an issue you
think is not obsolete, my apologies, just reopen.

-Jay


Review Request 30763: Patch for KAFKA-1865

2015-02-08 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
---

Review request for kafka.


Bugs: KAFKA-1865
https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
---

KAFKA-1865 Add a flush() method to the producer.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
17fe541588d462c68c33f6209717cc4015e9b62f 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
75513b0bdd439329c5771d87436ef83fda853bfb 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
54755e8dd3f23ced313067566cd4ea867f8a496e 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
---


Thanks,

Jay Kreps



[jira] [Created] (KAFKA-1932) kafka topic (creation) templates

2015-02-08 Thread Ahmet AKYOL (JIRA)
Ahmet AKYOL created KAFKA-1932:
--

 Summary: kafka topic (creation) templates
 Key: KAFKA-1932
 URL: https://issues.apache.org/jira/browse/KAFKA-1932
 Project: Kafka
  Issue Type: Wish
Reporter: Ahmet AKYOL


AFAIK, the only way to create a Kafka topic (without using the default 
settings) is using the provided bash script.

Even though, a client support could be nice, I would prefer to see a template 
mechanism similar to [Elasticsearch Index 
Templates|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-templates.html]
 . 

What I have in my mind is very simple and adding something like this into 
server properties :

template.name=pattern,numOfReplica,NumberOfPartition

and pattern can only contain "*" meaning starts with, ends with or contains.

example:
template.logtopics=*_log,2,20
template.loaders=*_loader,1,5

so when the first time some producer sends a message to a topic that ends with 
"_logs" , kafka can use above settings.

thanks in advance



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)