Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


> On July 22, 2015, 5:13 p.m., Guozhang Wang wrote:
> > LGTM overall. Could you address Ismael's comments as well before check-in?

Thanks, Guozhang. I updated patch to address Ismael's comments.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92607
---


On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 23, 2015, 12:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Addressed Gwen's comments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638182#comment-14638182
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~yasuhiro.matsuda] [~jkreps] [~hachikuji] [~gwenshap] If we are using 
pause/unpause, does that mean when user are not using consumer coordinator, 
they are equivalent to subscribe(partitions)/unsubscribe(partitions)?

I still don't understand why this is an "overloading" of current 
subscribe/unsubscribe API. The way I see how the KafkaConsumer API works is we 
have different methods to set different fields of a fetch request(offsets, 
topic, partitions), then we do a poll() using those settings. 

To me the definition of all the subscribe/unsubscribe methods stay unchanged:
* Subscrube/Unsubscribe to a TOPIC means
** Involve consumer coordinator to do partition assignment
** Consumer rebalance will be triggered
* Subscribe/Unsubscribe to a PARTITION means
** Do not involve consumer coordinator
** Consumer rebalance will not be triggered.

The only change is that instead of naively reject a PARTITION sub/unsub when 
consumer coordinator is involved, we allow users to decide whether they want to 
change the setting for your next poll() to exclude some topic partitions that 
have been assigned to this consumer.

Therefore I don't see why using subscribe(partitions)/unsubscribe(partitions) 
for pause and unpause consuming is a behavior re-definition. It looks to me 
that pause/unpause does the exact same thing as partition level 
subscribe/unsubscribe but we are adding them simply because we think user are 
using them for different use case. if so, does it mean we need to add yet 
another pair of interface if people are subscribe/unsubscribe partitions for 
some other use case? Then we are going to end up with a bunch of interfaces 
doing very similar or even exact same thing but with different names based on 
the use case. 

If the reason we don't like to use sub/unsub is because their names sound like 
purpose oriented and indicate a particular use case, we can change the name to 
something like addTopicPartition()/addTopic() (I know I am a terrible name 
picker, but hopefully you get what I wanted to say).

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638139#comment-14638139
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] yeah i vote for pause/resume (or unpause).

I think the challenge with making group management explicit is that then it 
will conflict with the api usage. I.e. the user will say 
client.subscribe(mytopic) and something terrible and bad will happen and they 
won't know why and we will say uh uh uh you forgot to set the magic 
enable.consumer.coordinator=true flag. This was kind of what I liked about the 
implicit approach--the natural usage of just subscribing to a partition or 
topic does what you would expect. I would anticipate making this a flag turning 
into a source of confusion because in the early stages of usage most people 
won't know what a consumer coordinator is.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92711
---



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 
47)


It seems that we will need to add similar logic in other Sends: MultiSend, 
FetchResponseSend, TopicDataSend and PartitionDataSend.


- Jun Rao


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
> PRE-CREATION

Re: Round Robin Among Consumers

2015-07-22 Thread J A
Why have partition at all, if I don't need to scale topic. Coupling topic
scalability with consumer scalability just goes against messaging systems
core principle of decoupling consumer and producers

On Wednesday, July 22, 2015, Aditya Auradkar 
wrote:

> Hi,
>
> Why not simply have as many partitions as the set of consumers you want to
> round robin across?
>
> Aditya
>
> On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh  > wrote:
>
> > Hey, don't you think that would be against the basic ordering guarantees
> > Kafka provides?
> >
> > On Wed, Jul 22, 2015 at 2:14 PM, J A >
> wrote:
> >
> > > Hi, This is reference to stackoverflow question "
> > >
> > >
> >
> http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
> > > "
> > > Since Kafka 0.8 already maintains a client offset, i would like to
> > request
> > > a feature, where a single partition consumption can be round robin
> > across a
> > > set of consumers. The message delivery strategy should be an option
> > chosen
> > > by the consumer.
> > >
> >
> >
> >
> > --
> >
> > Regards,
> > Ashish
> >
>


Re: Round Robin Among Consumers

2015-07-22 Thread J A
Ordering gurantee should be optional. If someone needs ordering, they can
always fall back on exclusive consumer strategy

On Wednesday, July 22, 2015, Ashish Singh  wrote:

> Hey, don't you think that would be against the basic ordering guarantees
> Kafka provides?
>
> On Wed, Jul 22, 2015 at 2:14 PM, J A >
> wrote:
>
> > Hi, This is reference to stackoverflow question "
> >
> >
> http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
> > "
> > Since Kafka 0.8 already maintains a client offset, i would like to
> request
> > a feature, where a single partition consumption can be round robin
> across a
> > set of consumers. The message delivery strategy should be an option
> chosen
> > by the consumer.
> >
>
>
>
> --
>
> Regards,
> Ashish
>


[jira] [Commented] (KAFKA-2351) Brokers are having a problem shutting down correctly

2015-07-22 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638076#comment-14638076
 ] 

Mayuresh Gharat commented on KAFKA-2351:


[~junrao], [~guozhang] can you take a look at this? 

> Brokers are having a problem shutting down correctly
> 
>
> Key: KAFKA-2351
> URL: https://issues.apache.org/jira/browse/KAFKA-2351
> Project: Kafka
>  Issue Type: Bug
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
> Attachments: KAFKA-2351.patch, KAFKA-2351_2015-07-21_14:58:13.patch
>
>
> The run() in Acceptor during shutdown might throw an exception that is not 
> caught and it never reaches shutdownComplete due to which the latch is not 
> counted down and the broker will not be able to shutdown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638070#comment-14638070
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

I think an explicit configuration makes sense. Different subscribe/unsubscribe 
methods confuses people from time to time.

So if enable.consumer.coordinator==false, user are on their own, all the 
subscribe/unsubscribe methods will work.

But what not clear to me is that if enable.consumer.coordinator==true, will 
user be able to call subscribe/unsbuscribe(partitions)?

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638063#comment-14638063
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

Yes, that is what I meant.

Isn't it the case that subscribe(partition) always check the if 
subscribe(topic) has been called or not?

If user wants to subscribe/unsubscribe to an illegal partition, I kind of think 
it is easy to understand that we just throw exception and say "Consumer is 
using Kafka based offset assignment and topic partition tp is not an assigned 
partition."

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > 
> >
> > So in case of unexpected exception, we log an error and keep running?
> > 
> > Isn't it better to kill the processor, since we don't know what's the 
> > state of the system? If the acceptor keeps placing messages in the queue 
> > for a dead processor, isn't it a separate issue?
> 
> Jiangjie Qin wrote:
> This part I'm not quite sure. I am not very experienced in the error 
> handling in such case, so please correct me if I missed something.
> Here is what I thought.
> 
> The way it currently works is that the acceptor will 
> 1. accept new connection request and create new socket channel
> 2. choose a processor and put the socket channel into the processor's new 
> connection queue
> 
> The processor will just take the socket channels from the queue and 
> register it to the selector.
> If the processor runs and get an uncaught exception, there are several 
> possibilities. 
> Case 1: The exception was from one socket channel. 
> Case 2: The exception was associated with a bad request. 
> In case 1, ideally we should just disconnect that socket channel without 
> affecting other socket channels.
> In case 2, I think we should log the error and skip the message - 
> assuming client will retry sending data if no response was received for a 
> given peoriod of time.
> 
> I am not sure if letting processor exit is a good idea because this will 
> lead to the result of a badly behaving client screw the entire cluster - it 
> might screw processors one by one. Comparing with that, I kind of leaning 
> towards keeping the processor running and serving other normal TCP 
> connections if possible, but log the error so monitoring system can detect 
> and see if human intervention is needed.
> 
> Also, I don't know what to do here to prevent the thread from exiting 
> without catching all the throwables.
> According to this blog 
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> I guess I can rethrow all the ControlThrowables, but intercept the rests?
> 
> Guozhang Wang wrote:
> I would also prefer not to close the Processor thread upon exceptions, 
> mainly for avoid one bad client killing a shared Kafka cluster. In the past 
> we have seen such issues like DDoS MetadataRequest killing the cluster and 
> all other clients gets affected, etc, and the quota work is towards 
> preventing it. Since Processor threads are shared (8 by default on a broker), 
> it should not be closed by a single socket / bad client request.
> 
> Gwen Shapira wrote:
> I like your thinking around cases #1 and #2. I think this should go as a 
> code comment somewhere, so when people improve / extend SocketServer they 
> will keep this logic in mind. Maybe even specify in specific catch clauses if 
> they are handling possible errors in request level or channel level.
> 
> My concern is with possible case #3: Each processor has an 
> o.a.k.common.network.Selector. I'm concerned about the possibility of 
> something going wrong in the state of the selector, which will possibly be an 
> issue for all channels. For example failure to register could be an issue 
> with the channel.register call, but also perhaps an issue with keys.put (just 
> an example - I'm not sure something can actually break keys table). 
> 
> I'd like to be able to identify cases where the Selector state may have 
> gone wrong and close the processor in that case. Does that make any sense? Or 
> am I being too paranoid?
> 
> Jiangjie Qin wrote:
> Hi Gwen, I think what you said makes sense. Maybe I see this more from a 
> failure boundary point of view. 
> 
> Actually we might need to do more if we let a processor exit. We need to 
> stop the acceptor from giving more channel to that acceptor, and potentially 
> stop the entire broker to prevent further issue. This might be a larger 
> failure scenario from what I can see.
> 
> If we limit the impact to a processor, even if the processor has 
> defunctioned, letting it keep running would limit the impact within that 
> processor but not affect other processors. The connecting to this processor 
> will eventually disconnect and retry to connect to other processor. So the 
> broker won't stop serving when a processor stop working.
> 
> I completely agree we would prefer to stop the service if something 
> really bad and unknown occured. But honestly I'm not sure how to exhaust all 
> the possible known throwables. In practise, I haven't seen any error of case 
> #3. So pragmatically I assume they are so rare and prefer to just catch all 
> the exceptions.
> 
> Gwen Shapira wrote:
> As mentioned on the mailing list - I'm convinced that #3 is an edge c

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


> On July 22, 2015, 10:40 a.m., Ismael Juma wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 401
> > 
> >
> > Is it intentional to ignore `java.lang.Error` too?

I think java.lang.Error is a subclass of throwables.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---


On July 23, 2015, 12:51 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 23, 2015, 12:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Addressed Gwen's comments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92700
---

Ship it!


Just one question on the unit test. Otherwise, LGTM.


clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 (line 173)


Why do we need to do this in a separate thread?


- Jason Gustafson


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36681/
> ---
> 
> (Updated July 22, 2015, 11:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2275
> https://issues.apache.org/jira/browse/KAFKA-2275
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
> Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
> topics. Remove braces for single line if
> 
> 
> KAFKA-2275: Add a ListTopics() API to the new consumer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> c14eed1e95f2e682a235159a366046f00d1d90d6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  d595c1cb07909e21946532501f648be3033603d9 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
> 
> Diff: https://reviews.apache.org/r/36681/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638036#comment-14638036
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Updated reviewboard https://reviews.apache.org/r/36670/diff/
 against branch origin/trunk

> Add an unit test to validate the deletion of a partition marked as deleted
> --
>
> Key: KAFKA-2355
> URL: https://issues.apache.org/jira/browse/KAFKA-2355
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.8.2.1
>Reporter: Edward Ribeiro
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, 
> KAFKA-2355_2015-07-22_22:23:13.patch
>
>
> Trying to delete a partition marked as deleted throws 
> {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
> validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Ribeiro updated KAFKA-2355:
--
Attachment: KAFKA-2355_2015-07-22_22:23:13.patch

> Add an unit test to validate the deletion of a partition marked as deleted
> --
>
> Key: KAFKA-2355
> URL: https://issues.apache.org/jira/browse/KAFKA-2355
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.8.2.1
>Reporter: Edward Ribeiro
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch, 
> KAFKA-2355_2015-07-22_22:23:13.patch
>
>
> Trying to delete a partition marked as deleted throws 
> {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
> validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/
---

(Updated July 23, 2015, 1:23 a.m.)


Review request for kafka.


Bugs: KAFKA-2355
https://issues.apache.org/jira/browse/KAFKA-2355


Repository: kafka


Description
---

KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
deleted


Diffs (updated)
-

  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 

Diff: https://reviews.apache.org/r/36670/diff/


Testing
---


Thanks,

Edward Ribeiro



[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638028#comment-14638028
 ] 

Jason Gustafson commented on KAFKA-1893:


[~singhashish] Sure, no problem.

> Allow regex subscriptions in the new consumer
> -
>
> Key: KAFKA-1893
> URL: https://issues.apache.org/jira/browse/KAFKA-1893
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.8.3
>
>
> The consumer needs to handle subscribing to regular expressions. Presumably 
> this would be done as a new api,
> {code}
>   void subscribe(java.util.regex.Pattern pattern);
> {code}
> Some questions/thoughts to work out:
>  - It should not be possible to mix pattern subscription with partition 
> subscription.
>  - Is it allowable to mix this with normal topic subscriptions? Logically 
> this is okay but a bit complex to implement.
>  - We need to ensure we regularly update the metadata and recheck our regexes 
> against the metadata to update subscriptions for new topics that are created 
> or old topics that are deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92696
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
(line 167)


The code looks pretty good. Congrats!



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
(line 201)


Just wondering to myself. If ``retryBackoffMs`` is >= than ``timeout``, 
then the caller will have one shot only where it can try to retrieve the 
topics. Not that this is a problem, as ``retryBackoffMs`` is a small amount of 
milliseconds, as far as I saw, but nevertheless something to be aware of.



core/src/test/scala/integration/kafka/api/ConsumerTest.scala (line 198)


nit: really *really* really cosmetic stuff, but ``topicPartsMap`` makes 
more sense, no? :)


- Edward Ribeiro


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36681/
> ---
> 
> (Updated July 22, 2015, 11:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2275
> https://issues.apache.org/jira/browse/KAFKA-2275
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
> Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
> topics. Remove braces for single line if
> 
> 
> KAFKA-2275: Add a ListTopics() API to the new consumer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> c14eed1e95f2e682a235159a366046f00d1d90d6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  d595c1cb07909e21946532501f648be3033603d9 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
> 
> Diff: https://reviews.apache.org/r/36681/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



[jira] [Updated] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.

2015-07-22 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-2353:

Attachment: KAFKA-2353_2015-07-22_17:51:42.patch

> SocketServer.Processor should catch exception and close the socket properly 
> in configureNewConnections.
> ---
>
> Key: KAFKA-2353
> URL: https://issues.apache.org/jira/browse/KAFKA-2353
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, 
> KAFKA-2353_2015-07-22_17:51:42.patch
>
>
> We see an increasing number of sockets in CLOSE_WAIT status in our production 
> environment in recent couple of days. From the thread dump it seems one of 
> the Processor thread has died but the acceptor was still putting many new 
> connections its new connection queue.
> The cause of dead Processor thread was due to we are not catching all the 
> exceptions in the Processor thread. For example, in our case it seems to be 
> an exception thrown in configureNewConnections().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/
---

(Updated July 23, 2015, 12:51 a.m.)


Review request for kafka.


Bugs: KAFKA-2353
https://issues.apache.org/jira/browse/KAFKA-2353


Repository: kafka


Description (updated)
---

Addressed Gwen's comments


Addressed Gwen's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/network/SocketServer.scala 
91319fa010b140cca632e5fa8050509bd2295fc9 

Diff: https://reviews.apache.org/r/36664/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-2353) SocketServer.Processor should catch exception and close the socket properly in configureNewConnections.

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638008#comment-14638008
 ] 

Jiangjie Qin commented on KAFKA-2353:
-

Updated reviewboard https://reviews.apache.org/r/36664/diff/
 against branch origin/trunk

> SocketServer.Processor should catch exception and close the socket properly 
> in configureNewConnections.
> ---
>
> Key: KAFKA-2353
> URL: https://issues.apache.org/jira/browse/KAFKA-2353
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-2353.patch, KAFKA-2353_2015-07-21_22:02:24.patch, 
> KAFKA-2353_2015-07-22_17:51:42.patch
>
>
> We see an increasing number of sockets in CLOSE_WAIT status in our production 
> environment in recent couple of days. From the thread dump it seems one of 
> the Processor thread has died but the acceptor was still putting many new 
> connections its new connection queue.
> The cause of dead Processor thread was due to we are not catching all the 
> exceptions in the Processor thread. For example, in our case it seems to be 
> an exception thrown in configureNewConnections().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637996#comment-14637996
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Hi [~ijuma] and [~granthenke], I have updated the patch as requested in the 
latest review. Let me know if something is missing or I misinterpreted, please. 
:)

ps: could one of you assign this little ticket to me? I asked for inclusion 
into colaborators' list so that I could assign issues to myself yesterday I 
guess, but it was not granted yet. :-P Cheers!

> Add an unit test to validate the deletion of a partition marked as deleted
> --
>
> Key: KAFKA-2355
> URL: https://issues.apache.org/jira/browse/KAFKA-2355
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.8.2.1
>Reporter: Edward Ribeiro
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch
>
>
> Trying to delete a partition marked as deleted throws 
> {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
> validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edward Ribeiro updated KAFKA-2355:
--
Attachment: KAFKA-2355_2015-07-22_21:37:51.patch

> Add an unit test to validate the deletion of a partition marked as deleted
> --
>
> Key: KAFKA-2355
> URL: https://issues.apache.org/jira/browse/KAFKA-2355
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.8.2.1
>Reporter: Edward Ribeiro
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch
>
>
> Trying to delete a partition marked as deleted throws 
> {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
> validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2355) Add an unit test to validate the deletion of a partition marked as deleted

2015-07-22 Thread Edward Ribeiro (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637984#comment-14637984
 ] 

Edward Ribeiro commented on KAFKA-2355:
---

Updated reviewboard https://reviews.apache.org/r/36670/diff/
 against branch origin/trunk

> Add an unit test to validate the deletion of a partition marked as deleted
> --
>
> Key: KAFKA-2355
> URL: https://issues.apache.org/jira/browse/KAFKA-2355
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.8.2.1
>Reporter: Edward Ribeiro
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-2355.patch, KAFKA-2355_2015-07-22_21:37:51.patch
>
>
> Trying to delete a partition marked as deleted throws 
> {{TopicAlreadyMarkedForDeletionException}} so this ticket add a unit test to 
> validate this behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/
---

(Updated July 23, 2015, 12:38 a.m.)


Review request for kafka.


Bugs: KAFKA-2355
https://issues.apache.org/jira/browse/KAFKA-2355


Repository: kafka


Description
---

KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
deleted


Diffs (updated)
-

  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 

Diff: https://reviews.apache.org/r/36670/diff/


Testing
---


Thanks,

Edward Ribeiro



Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Edward Ribeiro

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92688
---



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267)


I am ambivalent about catching '_', so I decided to let the error propagate.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269)


I disagree because we are issuing two `deleteTopic` operations, the first 
one should succeed and the second one should fail with an specific exception, 
but in spite of this the topic should have been deleted by the first call, even 
if the fail is called. But I am okay with removing the finally block and have 
it after the catch.


- Edward Ribeiro


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36670/
> ---
> 
> (Updated July 22, 2015, 1:46 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2355
> https://issues.apache.org/jira/browse/KAFKA-2355
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
> deleted
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
> 
> Diff: https://reviews.apache.org/r/36670/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Edward Ribeiro
> 
>



Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Stevo Slavić
Jiangjie,

Seems I was misunderstood; KafakConsumer.poll after subscribing to topic
via "KafkaConsumer.subscribe(String... topics)" does not work, "Unknown api
code 11" error, even with both client and broker being latest 0.8.3/trunk.
Will try to create a failing test and open bug report.

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 8:36 PM, Jiangjie Qin 
wrote:

> I don't think we have consumer coordinator in 0.8.2 brokers. So
> KafkaConsumer in 0.8.3 will only be able to subscribe to partitions
> explicitly. Subscribing to a topic won't work with 0.8.2 brokers.
>
> Jiangjie (Becket) Qin
>
> On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić  wrote:
>
> > I'm getting "Unknown api code 11" even when both client and server are
> > 0.8.3/trunk, when "KafkaConsumer.subscribe(String... topics)" is used.
> >
> > Bug?
> >
> > Kind regards,
> > Stevo Slavic.
> >
> > On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede 
> wrote:
> >
> > > Yes, I was clearly confused :-)
> > >
> > > On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon 
> > wrote:
> > >
> > > > Thanks for the responses. Ewen is correct that I am referring to the
> > > > *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
> > > >
> > > > I am extending the consumer to allow my applications more control
> over
> > > > committed offsets.  I really want to get away from zookeeper (so
> using
> > > > the offset storage), and re-balancing is something I haven't really
> > > > needed to tackle in an automated/seamless way.  Either way, I'll hold
> > > > off going further down this road until there is more interest.
> > > >
> > > > @Gwen
> > > > I set up a single consumer without partition.assignment.strategy or
> > > > rebalance.callback.class.  I was unable to subscribe to just a topic
> > > > ("Unknown api code 11" on broker), but I could subscribe to a
> > > > topicpartition.  This makes sense as I would need to handle
> re-balance
> > > > outside the consumer.  Things functioned as expected (well  I have an
> > > > additional minor fix to code from KAFKA-2121), and the only
> exceptions
> > > > on broker were due to closing consumers (which I have become
> > > > accustomed to).  My tests are specific to my extended version of the
> > > > consumer, but they basically do a little writing and reading with
> > > > different serde classes with application controlled commits (similar
> > > > to onSuccess and onFailure after each record, but with tolerance for
> > > > out of order acknowledgements).
> > > >
> > > > If you are interested, here is the patch of the hack against trunk.
> > > >
> > > > On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
> > > >  wrote:
> > > > > @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
> > > > 0.8.2/0.8.3
> > > > > that's being discussed here?
> > > > >
> > > > > I think the original question was about using the *new* consumer
> > > > ("clients
> > > > > consumer") with 0.8.2. Gwen's right, it will use features not even
> > > > > implemented in the broker in trunk yet, let alone the 0.8.2.
> > > > >
> > > > > I don't think the "enable.commit.downgrade" type option, or
> > supporting
> > > > the
> > > > > old protocol with the new consumer at all, makes much sense. You'd
> > end
> > > up
> > > > > with some weird hybrid of simple and high-level consumers -- you
> > could
> > > > use
> > > > > offset storage, but you'd have to manage rebalancing yourself since
> > > none
> > > > of
> > > > > the coordinator support would be there.
> > > > >
> > > > >
> > > > > On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede 
> > > > wrote:
> > > > >
> > > > >> My understanding is that ideally the 0.8.3 consumer should work
> with
> > > an
> > > > >> 0.8.2 broker if the offset commit config was set to "zookeeper".
> > > > >>
> > > > >> The only thing that might not work is offset commit to Kafka,
> which
> > > > makes
> > > > >> sense since the 0.8.2 broker does not support Kafka based offset
> > > > >> management.
> > > > >>
> > > > >> If we broke all kinds of offset commits, then it seems like a
> > > > regression,
> > > > >> no?
> > > > >>
> > > > >> On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira <
> > gshap...@cloudera.com>
> > > > >> wrote:
> > > > >>
> > > > >> > I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
> > > > >> > broker... there are some essential pieces that are missing in
> > 0.8.2
> > > > >> > (Coordinator, Heartbeat, etc).
> > > > >> > Maybe I'm missing something. It will be nice if this will work
> :)
> > > > >> >
> > > > >> > Mind sharing what / how you tested? Were there no errors in
> broker
> > > > >> > logs after your fix?
> > > > >> >
> > > > >> > On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon <
> lydon.s...@gmail.com
> > >
> > > > >> wrote:
> > > > >> > > Currently the clients consumer (trunk) sends offset commit
> > > requests
> > > > of
> > > > >> > > version 2.  The 0.8.2 brokers fail to handle this particular
> > > request
> > > > >> > > with a:
> > > > >> > >
> > > > >> > > jav

[jira] [Commented] (KAFKA-1893) Allow regex subscriptions in the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637894#comment-14637894
 ] 

Ashish K Singh commented on KAFKA-1893:
---

[~hachikuji] I think after working on KAFKA-2275, I got even more interested in 
working on this. In case you are not working on this yet and you think it is OK 
for me to take a crack on this, I would like to work on this.

> Allow regex subscriptions in the new consumer
> -
>
> Key: KAFKA-1893
> URL: https://issues.apache.org/jira/browse/KAFKA-1893
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.8.3
>
>
> The consumer needs to handle subscribing to regular expressions. Presumably 
> this would be done as a new api,
> {code}
>   void subscribe(java.util.regex.Pattern pattern);
> {code}
> Some questions/thoughts to work out:
>  - It should not be possible to mix pattern subscription with partition 
> subscription.
>  - Is it allowable to mix this with normal topic subscriptions? Logically 
> this is okay but a bit complex to implement.
>  - We need to ensure we regularly update the metadata and recheck our regexes 
> against the metadata to update subscriptions for new topics that are created 
> or old topics that are deleted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh


> On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
> > Hey Ashish, this looks pretty good to me. Just some minor comments.

Thanks for the review! Addressed your concerns.


> On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  line 298
> > 
> >
> > Any reason not to put this method in Fetcher instead of here? I don't 
> > have a strong feeling, but it was kind of nice keeping 
> > ConsumerNetworkClient largely free of application logic.
> > 
> > Also, it might be nice to have a unit test.

Moved and added unit test for Fetcher.getAllTopics.


> On July 22, 2015, 4:43 p.m., Jason Gustafson wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  lines 314-315
> > 
> >
> > I think I asked this before, but is there any harm in returning this 
> > topic to the user? I ask because we don't actually prevent them from 
> > calling partitionsFor with the same topic.

Removed the exclusion.


- Ashish


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92604
---


On July 22, 2015, 11:09 p.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36681/
> ---
> 
> (Updated July 22, 2015, 11:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2275
> https://issues.apache.org/jira/browse/KAFKA-2275
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
> Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
> topics. Remove braces for single line if
> 
> 
> KAFKA-2275: Add a ListTopics() API to the new consumer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> c14eed1e95f2e682a235159a366046f00d1d90d6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  d595c1cb07909e21946532501f648be3033603d9 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
> 
> Diff: https://reviews.apache.org/r/36681/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637890#comment-14637890
 ] 

Guozhang Wang commented on KAFKA-2350:
--

[~becket_qin] OK I understand now. The "subscribe(partition)" will check if 
"subscribe(topic)" has been called before, hence "assignment()" is not null. I 
think behavior is a bit confusing since it depends on the call history of 
"subscribe"..

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh updated KAFKA-2275:
--
Attachment: KAFKA-2275_2015-07-22_16:09:34.patch

> Add a ListTopics() API to the new consumer
> --
>
> Key: KAFKA-2275
> URL: https://issues.apache.org/jira/browse/KAFKA-2275
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Ashish K Singh
>Priority: Critical
> Fix For: 0.8.3
>
> Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
> KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, 
> KAFKA-2275_2015-07-22_16:09:34.patch
>
>
> With regex subscription like
> {code}
> consumer.subscribe("topic*")
> {code}
> The partition assignment is automatically done at the Kafka side, while there 
> are some use cases where consumers want regex subscriptions but not 
> Kafka-side partition assignment, rather with their own specific partition 
> assignment. With ListTopics() they can periodically check for topic list 
> changes and specifically subscribe to the partitions of the new topics.
> For implementation, it involves sending a TopicMetadataRequest to a random 
> broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2275) Add a ListTopics() API to the new consumer

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637889#comment-14637889
 ] 

Ashish K Singh commented on KAFKA-2275:
---

Updated reviewboard https://reviews.apache.org/r/36681/
 against branch trunk

> Add a ListTopics() API to the new consumer
> --
>
> Key: KAFKA-2275
> URL: https://issues.apache.org/jira/browse/KAFKA-2275
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>Assignee: Ashish K Singh
>Priority: Critical
> Fix For: 0.8.3
>
> Attachments: KAFKA-2275.patch, KAFKA-2275.patch, 
> KAFKA-2275_2015-07-17_21:39:27.patch, KAFKA-2275_2015-07-20_10:44:19.patch, 
> KAFKA-2275_2015-07-22_16:09:34.patch
>
>
> With regex subscription like
> {code}
> consumer.subscribe("topic*")
> {code}
> The partition assignment is automatically done at the Kafka side, while there 
> are some use cases where consumers want regex subscriptions but not 
> Kafka-side partition assignment, rather with their own specific partition 
> assignment. With ListTopics() they can periodically check for topic list 
> changes and specifically subscribe to the partitions of the new topics.
> For implementation, it involves sending a TopicMetadataRequest to a random 
> broker and parse the response.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/
---

(Updated July 22, 2015, 11:09 p.m.)


Review request for kafka.


Bugs: KAFKA-2275
https://issues.apache.org/jira/browse/KAFKA-2275


Repository: kafka


Description (updated)
---

Move getAllTopics from ConsumerNetworkClient to Fetcher. Add an unit test for 
Fetcher.getAllTopics. Do not ignore__consumer_offsets topic while getting all 
topics. Remove braces for single line if


KAFKA-2275: Add a ListTopics() API to the new consumer


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
48fe7961e2215372d8033ece4af739ea06c6457b 
  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
252b759c0801f392e3526b0f31503b4b8fbf1c8a 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
bea3d737c51be77d5b5293cdd944d33b905422ba 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
c14eed1e95f2e682a235159a366046f00d1d90d6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
d595c1cb07909e21946532501f648be3033603d9 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 7a4e586c1a0c52931a8ec55b6e1d5b67c67c28ea 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 

Diff: https://reviews.apache.org/r/36681/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637792#comment-14637792
 ] 

Jiangjie Qin commented on KAFKA-2350:
-

[~guozhang] In the last case you mentioned, is topic2-partition1 one of the 
partitions assigned by consumer coordinator? If it is not, the subscription 
will fail because the partition is not in the assignedTopicpartitions set.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637753#comment-14637753
 ] 

Jason Gustafson commented on KAFKA-2350:


[~jkreps] This is a little unrelated to this jira, but one nice consequence of 
moving group management into configuration is that it opens the door to using 
subscribe(topic) as a shortcut to subscribing to all of a topic's partitions 
for users who do not want to use group management (which does have a little 
overhead). Currently the user would have to get all the partitions for that 
topic and then subscribe to them individually. Not that bad, but a tad 
annoying, especially if you have to poll for changes in the number of 
partitions. 

Anyway, I tend to agree with [~yasuhiro.matsuda] and [~gwenshap] that trying to 
mix partition and topic subscriptions in order to do pausing seems problematic. 
The explicit pause/unpause methods might not be as elegant, but I think they 
are easier for users to understand and are much easier for us to implement.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS

2015-07-22 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637745#comment-14637745
 ] 

Ashish K Singh commented on KAFKA-2004:
---

Spark streaming supports reading from Kafka. Once you have read from Kafka, it 
is upto you what you want to do with it. Here is an example, 
https://github.com/SinghAsDev/pankh.

> Write Kafka messages directly to HDFS
> -
>
> Key: KAFKA-2004
> URL: https://issues.apache.org/jira/browse/KAFKA-2004
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.8.1.1
>Reporter: sutanu das
>Assignee: Neha Narkhede
>Priority: Critical
>
> 1. Is there a way to write Kafka messages directly to HDFS without writing 
> any consumer code? 
> 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes 
> directly from Kafka consumers to HDFS sync?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Round Robin Among Consumers

2015-07-22 Thread Aditya Auradkar
Hi,

Why not simply have as many partitions as the set of consumers you want to
round robin across?

Aditya

On Wed, Jul 22, 2015 at 2:37 PM, Ashish Singh  wrote:

> Hey, don't you think that would be against the basic ordering guarantees
> Kafka provides?
>
> On Wed, Jul 22, 2015 at 2:14 PM, J A  wrote:
>
> > Hi, This is reference to stackoverflow question "
> >
> >
> http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
> > "
> > Since Kafka 0.8 already maintains a client offset, i would like to
> request
> > a feature, where a single partition consumption can be round robin
> across a
> > set of consumers. The message delivery strategy should be an option
> chosen
> > by the consumer.
> >
>
>
>
> --
>
> Regards,
> Ashish
>


Jenkins build is back to normal : KafkaPreCommit #160

2015-07-22 Thread Apache Jenkins Server
See 



Re: Round Robin Among Consumers

2015-07-22 Thread Ashish Singh
Hey, don't you think that would be against the basic ordering guarantees
Kafka provides?

On Wed, Jul 22, 2015 at 2:14 PM, J A  wrote:

> Hi, This is reference to stackoverflow question "
>
> http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
> "
> Since Kafka 0.8 already maintains a client offset, i would like to request
> a feature, where a single partition consumption can be round robin across a
> set of consumers. The message delivery strategy should be an option chosen
> by the consumer.
>



-- 

Regards,
Ashish


Round Robin Among Consumers

2015-07-22 Thread J A
Hi, This is reference to stackoverflow question "
http://stackoverflow.com/questions/31547216/kafka-log-deletion-and-load-balancing-across-consumers
"
Since Kafka 0.8 already maintains a client offset, i would like to request
a feature, where a single partition consumption can be round robin across a
set of consumers. The message delivery strategy should be an option chosen
by the consumer.


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 465
> > 
> >
> > Turns out that catching Throwable is a really bad idea: 
> > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
> 
> Jiangjie Qin wrote:
> Ah... Didn't know that before. I explicitly listed the exceptions.
> 
> Guozhang Wang wrote:
> Searching ": Throwable" gives me 180+ cases in code base :P Though many 
> of them are from unit tests (which, arguably maybe OK) there are still a lot 
> in the core package. I agree that we should avoid catching Throwable whenever 
> possible, which will also help enforcing the developers to think about 
> possible checked exceptions in the calling trace.
> 
> Gwen Shapira wrote:
> I know :(
> I'm not sure if going over and converting everything is worth the effort. 
> Although it can be a nice newbie jira.
> 
> Jiangjie Qin wrote:
> Maybe we can simply change that to catch Throwables except 
> ControlThrowables? That might be a simple search and replace.

possible. definitely not in this JIRA though :)


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > 
> >
> > So in case of unexpected exception, we log an error and keep running?
> > 
> > Isn't it better to kill the processor, since we don't know what's the 
> > state of the system? If the acceptor keeps placing messages in the queue 
> > for a dead processor, isn't it a separate issue?
> 
> Jiangjie Qin wrote:
> This part I'm not quite sure. I am not very experienced in the error 
> handling in such case, so please correct me if I missed something.
> Here is what I thought.
> 
> The way it currently works is that the acceptor will 
> 1. accept new connection request and create new socket channel
> 2. choose a processor and put the socket channel into the processor's new 
> connection queue
> 
> The processor will just take the socket channels from the queue and 
> register it to the selector.
> If the processor runs and get an uncaught exception, there are several 
> possibilities. 
> Case 1: The exception was from one socket channel. 
> Case 2: The exception was associated with a bad request. 
> In case 1, ideally we should just disconnect that socket channel without 
> affecting other socket channels.
> In case 2, I think we should log the error and skip the message - 
> assuming client will retry sending data if no response was received for a 
> given peoriod of time.
> 
> I am not sure if letting processor exit is a good idea because this will 
> lead to the result of a badly behaving client screw the entire cluster - it 
> might screw processors one by one. Comparing with that, I kind of leaning 
> towards keeping the processor running and serving other normal TCP 
> connections if possible, but log the error so monitoring system can detect 
> and see if human intervention is needed.
> 
> Also, I don't know what to do here to prevent the thread from exiting 
> without catching all the throwables.
> According to this blog 
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> I guess I can rethrow all the ControlThrowables, but intercept the rests?
> 
> Guozhang Wang wrote:
> I would also prefer not to close the Processor thread upon exceptions, 
> mainly for avoid one bad client killing a shared Kafka cluster. In the past 
> we have seen such issues like DDoS MetadataRequest killing the cluster and 
> all other clients gets affected, etc, and the quota work is towards 
> preventing it. Since Processor threads are shared (8 by default on a broker), 
> it should not be closed by a single socket / bad client request.
> 
> Gwen Shapira wrote:
> I like your thinking around cases #1 and #2. I think this should go as a 
> code comment somewhere, so when people improve / extend SocketServer they 
> will keep this logic in mind. Maybe even specify in specific catch clauses if 
> they are handling possible errors in request level or channel level.
> 
> My concern is with possible case #3: Each processor has an 
> o.a.k.common.network.Selector. I'm concerned about the possibility of 
> something going wrong in the state of the selector, which will possibly be an 
> issue for all channels. For example failure to register could be an issue 
> with the channel.register call, but also perhaps an issue with keys.put (just 
> an example - I'm not sure something can actually break keys table). 
> 
> I'd like to be able to identify cases where the Selector state may have 
> gone wrong and close the processor in that case. Does that make any sense? Or 
> am I being too paranoid?
> 
> Jiangjie Qi

Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92644
---

Ship it!


LGTM. I think we can checkin after Jason's comments get addressed.

- Guozhang Wang


On July 22, 2015, 6:32 a.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36681/
> ---
> 
> (Updated July 22, 2015, 6:32 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2275
> https://issues.apache.org/jira/browse/KAFKA-2275
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2275: Add a ListTopics() API to the new consumer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> c14eed1e95f2e682a235159a366046f00d1d90d6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
> 
> Diff: https://reviews.apache.org/r/36681/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



Re: [VOTE] Switch to GitHub pull requests for new contributions

2015-07-22 Thread Ismael Juma
Hi,

On 22 Jul 2015 19:32, "Jiangjie Qin"  wrote:
>
> +1 (non binding)
>
> Can we have a wiki for procedure and let people verify the steps? After
> that we can update the Apache project page.

Yes, I linked to the page in the original message:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes

Committers and contributors have provided feedback in previous discussions
in the mailing list. Further improvements are welcome, of course.

Also see KAFKA-2321 and KAFKA-2349 for updates to the website and GitHub's
CONTRIBUTING.md.

Ismael


[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637539#comment-14637539
 ] 

Guozhang Wang commented on KAFKA-2342:
--

[~sslavic] corrected, thanks.

> KafkaConsumer rebalance with in-flight fetch can cause invalid position
> ---
>
> Key: KAFKA-2342
> URL: https://issues.apache.org/jira/browse/KAFKA-2342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Jason Gustafson
> Fix For: 0.8.3
>
>
> If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
> up updating the fetch position of a partition to an offset which is no longer 
> valid. The consequence is that we may end up either returning to the user 
> messages with an unexpected position or we may fail to give back the right 
> offset in position(). 
> Additionally, this bug causes transient test failures in 
> ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
> exception:
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2342:
-
Fix Version/s: (was: 0.9.0)
   0.8.3

> KafkaConsumer rebalance with in-flight fetch can cause invalid position
> ---
>
> Key: KAFKA-2342
> URL: https://issues.apache.org/jira/browse/KAFKA-2342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Jason Gustafson
> Fix For: 0.8.3
>
>
> If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
> up updating the fetch position of a partition to an offset which is no longer 
> valid. The consequence is that we may end up either returning to the user 
> messages with an unexpected position or we may fail to give back the right 
> offset in position(). 
> Additionally, this bug causes transient test failures in 
> ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
> exception:
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Stevo Slavic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637533#comment-14637533
 ] 

Stevo Slavic commented on KAFKA-2342:
-

[~guozhang], fix version 0.9.0 or 0.8.3?

> KafkaConsumer rebalance with in-flight fetch can cause invalid position
> ---
>
> Key: KAFKA-2342
> URL: https://issues.apache.org/jira/browse/KAFKA-2342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Jason Gustafson
> Fix For: 0.9.0
>
>
> If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
> up updating the fetch position of a partition to an offset which is no longer 
> valid. The consequence is that we may end up either returning to the user 
> messages with an unexpected position or we may fail to give back the right 
> offset in position(). 
> Additionally, this bug causes transient test failures in 
> ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
> exception:
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637525#comment-14637525
 ] 

ASF GitHub Bot commented on KAFKA-2342:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/88


> KafkaConsumer rebalance with in-flight fetch can cause invalid position
> ---
>
> Key: KAFKA-2342
> URL: https://issues.apache.org/jira/browse/KAFKA-2342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Jason Gustafson
> Fix For: 0.9.0
>
>
> If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
> up updating the fetch position of a partition to an offset which is no longer 
> valid. The consequence is that we may end up either returning to the user 
> messages with an unexpected position or we may fail to give back the right 
> offset in position(). 
> Additionally, this bug causes transient test failures in 
> ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
> exception:
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2342) KafkaConsumer rebalance with in-flight fetch can cause invalid position

2015-07-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-2342.
--
   Resolution: Fixed
Fix Version/s: 0.9.0

Issue resolved by pull request 88
[https://github.com/apache/kafka/pull/88]

> KafkaConsumer rebalance with in-flight fetch can cause invalid position
> ---
>
> Key: KAFKA-2342
> URL: https://issues.apache.org/jira/browse/KAFKA-2342
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Jason Gustafson
> Fix For: 0.9.0
>
>
> If a rebalance occurs with an in-flight fetch, the new KafkaConsumer can end 
> up updating the fetch position of a partition to an offset which is no longer 
> valid. The consequence is that we may end up either returning to the user 
> messages with an unexpected position or we may fail to give back the right 
> offset in position(). 
> Additionally, this bug causes transient test failures in 
> ConsumerBounceTest.testConsumptionWithBrokerFailures with the following 
> exception:
> kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures FAILED
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:949)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:86)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:61)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-2342; KafkaConsumer rebalance with in-fl...

2015-07-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/88


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Zookeeper use cases with Kafka

2015-07-22 Thread Prabhjot Bharaj
Hello Folks,

I wish to contribute to Kafka internals. And, one of the things which can
help me do that is understanding how kafka uses zookeeper. I have some of
these basic doubts:-

1. Is zookeeper primarily used for locking ? If yes, in what cases and what
kind of nodes does it use - sequential/ephemeral?

2. Does kafka use zookeeper watches for any of functions ?

3. What kind of state is stored in Zookeeper ? (I believe it has to be the
leader information per partition, but is there anything apart from it?)
What is the scale of data that is stored in Zookeeper ?

Looking forward for your help.

Thanks,
prabcs


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Jiangjie Qin


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 465
> > 
> >
> > Turns out that catching Throwable is a really bad idea: 
> > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
> 
> Jiangjie Qin wrote:
> Ah... Didn't know that before. I explicitly listed the exceptions.
> 
> Guozhang Wang wrote:
> Searching ": Throwable" gives me 180+ cases in code base :P Though many 
> of them are from unit tests (which, arguably maybe OK) there are still a lot 
> in the core package. I agree that we should avoid catching Throwable whenever 
> possible, which will also help enforcing the developers to think about 
> possible checked exceptions in the calling trace.
> 
> Gwen Shapira wrote:
> I know :(
> I'm not sure if going over and converting everything is worth the effort. 
> Although it can be a nice newbie jira.

Maybe we can simply change that to catch Throwables except ControlThrowables? 
That might be a simple search and replace.


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > 
> >
> > So in case of unexpected exception, we log an error and keep running?
> > 
> > Isn't it better to kill the processor, since we don't know what's the 
> > state of the system? If the acceptor keeps placing messages in the queue 
> > for a dead processor, isn't it a separate issue?
> 
> Jiangjie Qin wrote:
> This part I'm not quite sure. I am not very experienced in the error 
> handling in such case, so please correct me if I missed something.
> Here is what I thought.
> 
> The way it currently works is that the acceptor will 
> 1. accept new connection request and create new socket channel
> 2. choose a processor and put the socket channel into the processor's new 
> connection queue
> 
> The processor will just take the socket channels from the queue and 
> register it to the selector.
> If the processor runs and get an uncaught exception, there are several 
> possibilities. 
> Case 1: The exception was from one socket channel. 
> Case 2: The exception was associated with a bad request. 
> In case 1, ideally we should just disconnect that socket channel without 
> affecting other socket channels.
> In case 2, I think we should log the error and skip the message - 
> assuming client will retry sending data if no response was received for a 
> given peoriod of time.
> 
> I am not sure if letting processor exit is a good idea because this will 
> lead to the result of a badly behaving client screw the entire cluster - it 
> might screw processors one by one. Comparing with that, I kind of leaning 
> towards keeping the processor running and serving other normal TCP 
> connections if possible, but log the error so monitoring system can detect 
> and see if human intervention is needed.
> 
> Also, I don't know what to do here to prevent the thread from exiting 
> without catching all the throwables.
> According to this blog 
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> I guess I can rethrow all the ControlThrowables, but intercept the rests?
> 
> Guozhang Wang wrote:
> I would also prefer not to close the Processor thread upon exceptions, 
> mainly for avoid one bad client killing a shared Kafka cluster. In the past 
> we have seen such issues like DDoS MetadataRequest killing the cluster and 
> all other clients gets affected, etc, and the quota work is towards 
> preventing it. Since Processor threads are shared (8 by default on a broker), 
> it should not be closed by a single socket / bad client request.
> 
> Gwen Shapira wrote:
> I like your thinking around cases #1 and #2. I think this should go as a 
> code comment somewhere, so when people improve / extend SocketServer they 
> will keep this logic in mind. Maybe even specify in specific catch clauses if 
> they are handling possible errors in request level or channel level.
> 
> My concern is with possible case #3: Each processor has an 
> o.a.k.common.network.Selector. I'm concerned about the possibility of 
> something going wrong in the state of the selector, which will possibly be an 
> issue for all channels. For example failure to register could be an issue 
> with the channel.register call, but also perhaps an issue with keys.put (just 
> an example - I'm not sure something can actually break keys table). 
> 
> I'd like to be able to identify cases where the Selector state may have 
> gone wrong and close the processor in that case. Does that make any sense? Or 
> am I being too paranoid?

Hi Gwen, I think what you said makes sense. Maybe I see this more from a 
failure boundary point

[jira] [Commented] (KAFKA-2004) Write Kafka messages directly to HDFS

2015-07-22 Thread sutanu das (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637419#comment-14637419
 ] 

sutanu das commented on KAFKA-2004:
---

We just want to stream each logfile as event from kafka queue via Storm to HDFS

Is there a basic sprout or bolt code available in python to do this?

> Write Kafka messages directly to HDFS
> -
>
> Key: KAFKA-2004
> URL: https://issues.apache.org/jira/browse/KAFKA-2004
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.8.1.1
>Reporter: sutanu das
>Assignee: Neha Narkhede
>Priority: Critical
>
> 1. Is there a way to write Kafka messages directly to HDFS without writing 
> any consumer code? 
> 2. Is there anyway to integrate Kafka with Storm or Spark so messages goes 
> directly from Kafka consumers to HDFS sync?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira
OK, I'm convinced :)

I'll commit after Becket addresses other review points and adds some
comments on how our error handling works (to make sure we don't screw
it up later).

Gwen

On Wed, Jul 22, 2015 at 11:34 AM, Todd Palino  wrote:
> Since I've been dealing with the fallout of this particular problem all
> week, I'll add a few thoughts...
>
>
> On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira 
> wrote:
>>
>>
>>
>> > On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
>> > > core/src/main/scala/kafka/network/SocketServer.scala, line 465
>> > >
>> > > 
>> > >
>> > > Turns out that catching Throwable is a really bad idea:
>> > > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
>> >
>> > Jiangjie Qin wrote:
>> > Ah... Didn't know that before. I explicitly listed the exceptions.
>> >
>> > Guozhang Wang wrote:
>> > Searching ": Throwable" gives me 180+ cases in code base :P Though
>> > many of them are from unit tests (which, arguably maybe OK) there are still
>> > a lot in the core package. I agree that we should avoid catching Throwable
>> > whenever possible, which will also help enforcing the developers to think
>> > about possible checked exceptions in the calling trace.
>>
>> I know :(
>> I'm not sure if going over and converting everything is worth the effort.
>> Although it can be a nice newbie jira.
>>
>>
>> > On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
>> > > core/src/main/scala/kafka/network/SocketServer.scala, line 400
>> > >
>> > > 
>> > >
>> > > So in case of unexpected exception, we log an error and keep
>> > > running?
>> > >
>> > > Isn't it better to kill the processor, since we don't know what's
>> > > the state of the system? If the acceptor keeps placing messages in the 
>> > > queue
>> > > for a dead processor, isn't it a separate issue?
>> >
>> > Jiangjie Qin wrote:
>> > This part I'm not quite sure. I am not very experienced in the error
>> > handling in such case, so please correct me if I missed something.
>> > Here is what I thought.
>> >
>> > The way it currently works is that the acceptor will
>> > 1. accept new connection request and create new socket channel
>> > 2. choose a processor and put the socket channel into the
>> > processor's new connection queue
>> >
>> > The processor will just take the socket channels from the queue and
>> > register it to the selector.
>> > If the processor runs and get an uncaught exception, there are
>> > several possibilities.
>> > Case 1: The exception was from one socket channel.
>> > Case 2: The exception was associated with a bad request.
>> > In case 1, ideally we should just disconnect that socket channel
>> > without affecting other socket channels.
>> > In case 2, I think we should log the error and skip the message -
>> > assuming client will retry sending data if no response was received for a
>> > given peoriod of time.
>> >
>> > I am not sure if letting processor exit is a good idea because this
>> > will lead to the result of a badly behaving client screw the entire cluster
>> > - it might screw processors one by one. Comparing with that, I kind of
>> > leaning towards keeping the processor running and serving other normal TCP
>> > connections if possible, but log the error so monitoring system can detect
>> > and see if human intervention is needed.
>> >
>> > Also, I don't know what to do here to prevent the thread from
>> > exiting without catching all the throwables.
>> > According to this blog
>> > http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
>> > I guess I can rethrow all the ControlThrowables, but intercept the
>> > rests?
>> >
>> > Guozhang Wang wrote:
>> > I would also prefer not to close the Processor thread upon
>> > exceptions, mainly for avoid one bad client killing a shared Kafka cluster.
>> > In the past we have seen such issues like DDoS MetadataRequest killing the
>> > cluster and all other clients gets affected, etc, and the quota work is
>> > towards preventing it. Since Processor threads are shared (8 by default on 
>> > a
>> > broker), it should not be closed by a single socket / bad client request.
>>
>> I like your thinking around cases #1 and #2. I think this should go as a
>> code comment somewhere, so when people improve / extend SocketServer they
>> will keep this logic in mind. Maybe even specify in specific catch clauses
>> if they are handling possible errors in request level or channel level.
>>
>> My concern is with possible case #3: Each processor has an
>> o.a.k.common.network.Selector. I'm concerned about the possibility of
>> something going wrong in the state of the selector, which will possibly be
>> an issue for all channels. For example failure to register could be an issue
>> with the channel.reg

Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Jiangjie Qin
I don't think we have consumer coordinator in 0.8.2 brokers. So
KafkaConsumer in 0.8.3 will only be able to subscribe to partitions
explicitly. Subscribing to a topic won't work with 0.8.2 brokers.

Jiangjie (Becket) Qin

On Wed, Jul 22, 2015 at 4:26 AM, Stevo Slavić  wrote:

> I'm getting "Unknown api code 11" even when both client and server are
> 0.8.3/trunk, when "KafkaConsumer.subscribe(String... topics)" is used.
>
> Bug?
>
> Kind regards,
> Stevo Slavic.
>
> On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede  wrote:
>
> > Yes, I was clearly confused :-)
> >
> > On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon 
> wrote:
> >
> > > Thanks for the responses. Ewen is correct that I am referring to the
> > > *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
> > >
> > > I am extending the consumer to allow my applications more control over
> > > committed offsets.  I really want to get away from zookeeper (so using
> > > the offset storage), and re-balancing is something I haven't really
> > > needed to tackle in an automated/seamless way.  Either way, I'll hold
> > > off going further down this road until there is more interest.
> > >
> > > @Gwen
> > > I set up a single consumer without partition.assignment.strategy or
> > > rebalance.callback.class.  I was unable to subscribe to just a topic
> > > ("Unknown api code 11" on broker), but I could subscribe to a
> > > topicpartition.  This makes sense as I would need to handle re-balance
> > > outside the consumer.  Things functioned as expected (well  I have an
> > > additional minor fix to code from KAFKA-2121), and the only exceptions
> > > on broker were due to closing consumers (which I have become
> > > accustomed to).  My tests are specific to my extended version of the
> > > consumer, but they basically do a little writing and reading with
> > > different serde classes with application controlled commits (similar
> > > to onSuccess and onFailure after each record, but with tolerance for
> > > out of order acknowledgements).
> > >
> > > If you are interested, here is the patch of the hack against trunk.
> > >
> > > On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
> > >  wrote:
> > > > @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
> > > 0.8.2/0.8.3
> > > > that's being discussed here?
> > > >
> > > > I think the original question was about using the *new* consumer
> > > ("clients
> > > > consumer") with 0.8.2. Gwen's right, it will use features not even
> > > > implemented in the broker in trunk yet, let alone the 0.8.2.
> > > >
> > > > I don't think the "enable.commit.downgrade" type option, or
> supporting
> > > the
> > > > old protocol with the new consumer at all, makes much sense. You'd
> end
> > up
> > > > with some weird hybrid of simple and high-level consumers -- you
> could
> > > use
> > > > offset storage, but you'd have to manage rebalancing yourself since
> > none
> > > of
> > > > the coordinator support would be there.
> > > >
> > > >
> > > > On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede 
> > > wrote:
> > > >
> > > >> My understanding is that ideally the 0.8.3 consumer should work with
> > an
> > > >> 0.8.2 broker if the offset commit config was set to "zookeeper".
> > > >>
> > > >> The only thing that might not work is offset commit to Kafka, which
> > > makes
> > > >> sense since the 0.8.2 broker does not support Kafka based offset
> > > >> management.
> > > >>
> > > >> If we broke all kinds of offset commits, then it seems like a
> > > regression,
> > > >> no?
> > > >>
> > > >> On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira <
> gshap...@cloudera.com>
> > > >> wrote:
> > > >>
> > > >> > I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
> > > >> > broker... there are some essential pieces that are missing in
> 0.8.2
> > > >> > (Coordinator, Heartbeat, etc).
> > > >> > Maybe I'm missing something. It will be nice if this will work :)
> > > >> >
> > > >> > Mind sharing what / how you tested? Were there no errors in broker
> > > >> > logs after your fix?
> > > >> >
> > > >> > On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon  >
> > > >> wrote:
> > > >> > > Currently the clients consumer (trunk) sends offset commit
> > requests
> > > of
> > > >> > > version 2.  The 0.8.2 brokers fail to handle this particular
> > request
> > > >> > > with a:
> > > >> > >
> > > >> > > java.lang.AssertionError: assertion failed: Version 2 is invalid
> > for
> > > >> > > OffsetCommitRequest. Valid versions are 0 or 1.
> > > >> > >
> > > >> > > I was able to make this work via a forceful downgrade of this
> > > >> > > particular request, but I would like some feedback on whether a
> > > >> > > "enable.commit.downgrade" configuration would be a tolerable
> > method
> > > to
> > > >> > > allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
> > > >> > > interested in this even being a goal worth pursuing.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Sean
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> 

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Todd Palino
Since I've been dealing with the fallout of this particular problem all
week, I'll add a few thoughts...


On Wed, Jul 22, 2015 at 10:51 AM, Gwen Shapira 
wrote:

>
>
> > On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > > core/src/main/scala/kafka/network/SocketServer.scala, line 465
> > > <
> https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line465
> >
> > >
> > > Turns out that catching Throwable is a really bad idea:
> https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
> >
> > Jiangjie Qin wrote:
> > Ah... Didn't know that before. I explicitly listed the exceptions.
> >
> > Guozhang Wang wrote:
> > Searching ": Throwable" gives me 180+ cases in code base :P Though
> many of them are from unit tests (which, arguably maybe OK) there are still
> a lot in the core package. I agree that we should avoid catching Throwable
> whenever possible, which will also help enforcing the developers to think
> about possible checked exceptions in the calling trace.
>
> I know :(
> I'm not sure if going over and converting everything is worth the effort.
> Although it can be a nice newbie jira.
>
>
> > On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > > <
> https://reviews.apache.org/r/36664/diff/1/?file=1018238#file1018238line400
> >
> > >
> > > So in case of unexpected exception, we log an error and keep
> running?
> > >
> > > Isn't it better to kill the processor, since we don't know what's
> the state of the system? If the acceptor keeps placing messages in the
> queue for a dead processor, isn't it a separate issue?
> >
> > Jiangjie Qin wrote:
> > This part I'm not quite sure. I am not very experienced in the error
> handling in such case, so please correct me if I missed something.
> > Here is what I thought.
> >
> > The way it currently works is that the acceptor will
> > 1. accept new connection request and create new socket channel
> > 2. choose a processor and put the socket channel into the
> processor's new connection queue
> >
> > The processor will just take the socket channels from the queue and
> register it to the selector.
> > If the processor runs and get an uncaught exception, there are
> several possibilities.
> > Case 1: The exception was from one socket channel.
> > Case 2: The exception was associated with a bad request.
> > In case 1, ideally we should just disconnect that socket channel
> without affecting other socket channels.
> > In case 2, I think we should log the error and skip the message -
> assuming client will retry sending data if no response was received for a
> given peoriod of time.
> >
> > I am not sure if letting processor exit is a good idea because this
> will lead to the result of a badly behaving client screw the entire cluster
> - it might screw processors one by one. Comparing with that, I kind of
> leaning towards keeping the processor running and serving other normal TCP
> connections if possible, but log the error so monitoring system can detect
> and see if human intervention is needed.
> >
> > Also, I don't know what to do here to prevent the thread from
> exiting without catching all the throwables.
> > According to this blog
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> > I guess I can rethrow all the ControlThrowables, but intercept the
> rests?
> >
> > Guozhang Wang wrote:
> > I would also prefer not to close the Processor thread upon
> exceptions, mainly for avoid one bad client killing a shared Kafka cluster.
> In the past we have seen such issues like DDoS MetadataRequest killing the
> cluster and all other clients gets affected, etc, and the quota work is
> towards preventing it. Since Processor threads are shared (8 by default on
> a broker), it should not be closed by a single socket / bad client request.
>
> I like your thinking around cases #1 and #2. I think this should go as a
> code comment somewhere, so when people improve / extend SocketServer they
> will keep this logic in mind. Maybe even specify in specific catch clauses
> if they are handling possible errors in request level or channel level.
>
> My concern is with possible case #3: Each processor has an
> o.a.k.common.network.Selector. I'm concerned about the possibility of
> something going wrong in the state of the selector, which will possibly be
> an issue for all channels. For example failure to register could be an
> issue with the channel.register call, but also perhaps an issue with
> keys.put (just an example - I'm not sure something can actually break keys
> table).
>
> I'd like to be able to identify cases where the Selector state may have
> gone wrong and close the processor in that case. Does that make any sense?
> Or am I being too paranoid?
>

If there are error cases that are not associated with a specific connection
or request, then I agree 

Re: [VOTE] Switch to GitHub pull requests for new contributions

2015-07-22 Thread Jiangjie Qin
+1 (non binding)

Can we have a wiki for procedure and let people verify the steps? After
that we can update the Apache project page.

On Tue, Jul 21, 2015 at 5:38 PM, Edward Ribeiro 
wrote:

> +1 (non binding)
>
> On Tue, Jul 21, 2015 at 7:36 PM, Jay Kreps  wrote:
>
> > +1
> >
> > -Jay
> >
> > On Tue, Jul 21, 2015 at 4:28 AM, Ismael Juma  wrote:
> >
> > > Hi all,
> > >
> > > I would like to start a vote on switching to GitHub pull requests for
> new
> > > contributions. To be precise, the vote is on whether we should:
> > >
> > > * Update the documentation to tell users to use pull requests instead
> of
> > > patches and Review Board (i.e. merge KAFKA-2321 and KAFKA-2349)
> > > * Use pull requests for new contributions
> > >
> > > In a previous discussion[1], everyone that participated was in favour.
> > It's
> > > also worth reading the "Contributing Code Changes" wiki page[2] (if you
> > > haven't already) to understand the flow.
> > >
> > > A number of pull requests have been merged in the last few weeks to
> test
> > > this flow and I believe it's working well enough. As usual, there is
> > always
> > > room for improvement and I expect is to tweak things as time goes on.
> > >
> > > The main downside of using GitHub pull requests is that we don't have
> > write
> > > access to https://github.com/apache/kafka. That means that we rely on
> > > commit hooks to close integrated pull requests (the merge script takes
> > care
> > > of formatting the message so that this happens) and the PR creator or
> > > Apache Infra to close pull requests that are not integrated.
> > >
> > > Regarding existing contributions, I think it's up to the contributor to
> > > decide whether they want to resubmit it as a pull request or not. I
> > expect
> > > that there will be a transition period where the old and new way will
> > > co-exist. But that can be discussed separately.
> > >
> > > The vote will run for 72 hours.
> > >
> > > +1 (non-binding) from me.
> > >
> > > Best,
> > > Ismael
> > >
> > > [1] http://search-hadoop.com/m/uyzND1N6CDH1DUc82
> > > [2]
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes
> > >
> >
>


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Gwen Shapira
Tangent: I think we should complete the move of Produce / Fetch RPC to
the client libraries before we add more revisions to this protocol.

On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
 wrote:
> I missed yesterday's KIP hangout. I'm currently working on another KIP for
> enriched metadata of messages. Guozhang has already created a wiki page
> before (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
> We plan to fill the relative offset to the offset field in the batch sent
> by producer to avoid broker side re-compression. The message offset would
> become batch base offset + relative offset. I guess maybe the expected
> offset in KIP-27 can be only set for base offset? Would that affect certain
> use cases?
>
> For Jun's comments, I am not sure I completely get it. I think the producer
> only sends one batch per partition in a request. So either that batch is
> appended or not. Why a batch would be partially committed?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin  wrote:
>
>> That's a fair point. I've added some imagined job logic to the KIP, so
>> we can make sure the proposal stays in sync with the usages we're
>> discussing. (The logic is just a quick sketch for now -- I expect I'll
>> need to elaborate it as we get into more detail, or to address other
>> concerns...)
>>
>> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao  wrote:
>> > For 1, yes, when there is a transient leader change, it's guaranteed
>> that a
>> > prefix of the messages in a request will be committed. However, it seems
>> > that the client needs to know what subset of messages are committed in
>> > order to resume the sending. Then the question is how.
>> >
>> > As Flavio indicated, for the use cases that you listed, it would be
>> useful
>> > to figure out the exact logic by using this feature. For example, in the
>> > partition K/V store example, when we fail over to a new writer to the
>> > commit log, the zombie writer can publish new messages to the log after
>> the
>> > new writer takes over, but before it publishes any message. We probably
>> > need to outline how this case can be handled properly.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin  wrote:
>> >
>> >> Hi Jun,
>> >>
>> >> Thanks for the close reading! Responses inline.
>> >>
>> >> > Thanks for the write-up. The single producer use case you mentioned
>> makes
>> >> > sense. It would be useful to include that in the KIP wiki.
>> >>
>> >> Great -- I'll make sure that the wiki is clear about this.
>> >>
>> >> > 1. What happens when the leader of the partition changes in the middle
>> >> of a
>> >> > produce request? In this case, the producer client is not sure whether
>> >> the
>> >> > request succeeds or not. If there is only a single message in the
>> >> request,
>> >> > the producer can just resend the request. If it sees an OffsetMismatch
>> >> > error, it knows that the previous send actually succeeded and can
>> proceed
>> >> > with the next write. This is nice since it not only allows the
>> producer
>> >> to
>> >> > proceed during transient failures in the broker, it also avoids
>> >> duplicates
>> >> > during producer resend. One caveat is when there are multiple
>> messages in
>> >> > the same partition in a produce request. The issue is that in our
>> current
>> >> > replication protocol, it's possible for some, but not all messages in
>> the
>> >> > request to be committed. This makes resend a bit harder to deal with
>> >> since
>> >> > on receiving an OffsetMismatch error, it's not clear which messages
>> have
>> >> > been committed. One possibility is to expect that compression is
>> enabled,
>> >> > in which case multiple messages are compressed into a single message.
>> I
>> >> was
>> >> > thinking that another possibility is for the broker to return the
>> current
>> >> > high watermark when sending an OffsetMismatch error. Based on this
>> info,
>> >> > the producer can resend the subset of messages that have not been
>> >> > committed. However, this may not work in a compacted topic since there
>> >> can
>> >> > be holes in the offset.
>> >>
>> >> This is a excellent question. It's my understanding that at least a
>> >> *prefix* of messages will be committed (right?) -- which seems to be
>> >> enough for many cases. I'll try and come up with a more concrete
>> >> answer here.
>> >>
>> >> > 2. Is this feature only intended to be used with ack = all? The client
>> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
>> >> > previously acked message to be lost during leader transition, which
>> will
>> >> > make the client logic more complicated.
>> >>
>> >> It's true that acks = 0 doesn't seem to be particularly useful; in all
>> >> the cases I've come across, the client eventually wants to know about
>> >> the mismatch error. However, it seems like there are some cases where
>> >> acks = 1 would be fine -- eg. in

[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637338#comment-14637338
 ] 

Gwen Shapira commented on KAFKA-2350:
-

Agree with [~yasuhiro.matsuda]. 

The notion of subscribe vs assigned is a bit challenging to grasp as is 
([~guozhang] demostrated that nicely), adding flow-control mechanics into it 
will be messy and difficult for users to get right. 
I think we all hope that in most cases users will subscribe and unsubscribe to 
entire topics and let the coordinator handle the details (thereby reducing 
use-errors, mailing list cries for help, etc). Adding APIs that will cause the 
opposite to happen is a step in the opposite direction. 

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637317#comment-14637317
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~hachikuji] Yeah I agree that would help this case. The nice thing about that 
proposal is it would make the group management explicit which could be nice. I 
wonder if that might not add more things that can go wrong in the common case, 
though. i.e. right now the common case of just subscribing to a topic and 
letting the group management figure out the assignment and it is kind of hard 
to mess that up. All the cases where either you subscribe to individual 
partitions or you pause partitions are kind of niche uses so maybe it is less 
important to optimize for those cases?

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-27 - Conditional Publish

2015-07-22 Thread Jiangjie Qin
I missed yesterday's KIP hangout. I'm currently working on another KIP for
enriched metadata of messages. Guozhang has already created a wiki page
before (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata).
We plan to fill the relative offset to the offset field in the batch sent
by producer to avoid broker side re-compression. The message offset would
become batch base offset + relative offset. I guess maybe the expected
offset in KIP-27 can be only set for base offset? Would that affect certain
use cases?

For Jun's comments, I am not sure I completely get it. I think the producer
only sends one batch per partition in a request. So either that batch is
appended or not. Why a batch would be partially committed?

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin  wrote:

> That's a fair point. I've added some imagined job logic to the KIP, so
> we can make sure the proposal stays in sync with the usages we're
> discussing. (The logic is just a quick sketch for now -- I expect I'll
> need to elaborate it as we get into more detail, or to address other
> concerns...)
>
> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao  wrote:
> > For 1, yes, when there is a transient leader change, it's guaranteed
> that a
> > prefix of the messages in a request will be committed. However, it seems
> > that the client needs to know what subset of messages are committed in
> > order to resume the sending. Then the question is how.
> >
> > As Flavio indicated, for the use cases that you listed, it would be
> useful
> > to figure out the exact logic by using this feature. For example, in the
> > partition K/V store example, when we fail over to a new writer to the
> > commit log, the zombie writer can publish new messages to the log after
> the
> > new writer takes over, but before it publishes any message. We probably
> > need to outline how this case can be handled properly.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin  wrote:
> >
> >> Hi Jun,
> >>
> >> Thanks for the close reading! Responses inline.
> >>
> >> > Thanks for the write-up. The single producer use case you mentioned
> makes
> >> > sense. It would be useful to include that in the KIP wiki.
> >>
> >> Great -- I'll make sure that the wiki is clear about this.
> >>
> >> > 1. What happens when the leader of the partition changes in the middle
> >> of a
> >> > produce request? In this case, the producer client is not sure whether
> >> the
> >> > request succeeds or not. If there is only a single message in the
> >> request,
> >> > the producer can just resend the request. If it sees an OffsetMismatch
> >> > error, it knows that the previous send actually succeeded and can
> proceed
> >> > with the next write. This is nice since it not only allows the
> producer
> >> to
> >> > proceed during transient failures in the broker, it also avoids
> >> duplicates
> >> > during producer resend. One caveat is when there are multiple
> messages in
> >> > the same partition in a produce request. The issue is that in our
> current
> >> > replication protocol, it's possible for some, but not all messages in
> the
> >> > request to be committed. This makes resend a bit harder to deal with
> >> since
> >> > on receiving an OffsetMismatch error, it's not clear which messages
> have
> >> > been committed. One possibility is to expect that compression is
> enabled,
> >> > in which case multiple messages are compressed into a single message.
> I
> >> was
> >> > thinking that another possibility is for the broker to return the
> current
> >> > high watermark when sending an OffsetMismatch error. Based on this
> info,
> >> > the producer can resend the subset of messages that have not been
> >> > committed. However, this may not work in a compacted topic since there
> >> can
> >> > be holes in the offset.
> >>
> >> This is a excellent question. It's my understanding that at least a
> >> *prefix* of messages will be committed (right?) -- which seems to be
> >> enough for many cases. I'll try and come up with a more concrete
> >> answer here.
> >>
> >> > 2. Is this feature only intended to be used with ack = all? The client
> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible for a
> >> > previously acked message to be lost during leader transition, which
> will
> >> > make the client logic more complicated.
> >>
> >> It's true that acks = 0 doesn't seem to be particularly useful; in all
> >> the cases I've come across, the client eventually wants to know about
> >> the mismatch error. However, it seems like there are some cases where
> >> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
> >> losing messages during a leader transition just means you need to
> >> rewind / restart the load, which is not especially catastrophic. For
> >> many other interesting cases, acks = all is probably preferable.
> >>
> >> > 3. How does the producer client know the offset to send the f

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 465
> > 
> >
> > Turns out that catching Throwable is a really bad idea: 
> > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
> 
> Jiangjie Qin wrote:
> Ah... Didn't know that before. I explicitly listed the exceptions.
> 
> Guozhang Wang wrote:
> Searching ": Throwable" gives me 180+ cases in code base :P Though many 
> of them are from unit tests (which, arguably maybe OK) there are still a lot 
> in the core package. I agree that we should avoid catching Throwable whenever 
> possible, which will also help enforcing the developers to think about 
> possible checked exceptions in the calling trace.

I know :(
I'm not sure if going over and converting everything is worth the effort. 
Although it can be a nice newbie jira.


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > 
> >
> > So in case of unexpected exception, we log an error and keep running?
> > 
> > Isn't it better to kill the processor, since we don't know what's the 
> > state of the system? If the acceptor keeps placing messages in the queue 
> > for a dead processor, isn't it a separate issue?
> 
> Jiangjie Qin wrote:
> This part I'm not quite sure. I am not very experienced in the error 
> handling in such case, so please correct me if I missed something.
> Here is what I thought.
> 
> The way it currently works is that the acceptor will 
> 1. accept new connection request and create new socket channel
> 2. choose a processor and put the socket channel into the processor's new 
> connection queue
> 
> The processor will just take the socket channels from the queue and 
> register it to the selector.
> If the processor runs and get an uncaught exception, there are several 
> possibilities. 
> Case 1: The exception was from one socket channel. 
> Case 2: The exception was associated with a bad request. 
> In case 1, ideally we should just disconnect that socket channel without 
> affecting other socket channels.
> In case 2, I think we should log the error and skip the message - 
> assuming client will retry sending data if no response was received for a 
> given peoriod of time.
> 
> I am not sure if letting processor exit is a good idea because this will 
> lead to the result of a badly behaving client screw the entire cluster - it 
> might screw processors one by one. Comparing with that, I kind of leaning 
> towards keeping the processor running and serving other normal TCP 
> connections if possible, but log the error so monitoring system can detect 
> and see if human intervention is needed.
> 
> Also, I don't know what to do here to prevent the thread from exiting 
> without catching all the throwables.
> According to this blog 
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> I guess I can rethrow all the ControlThrowables, but intercept the rests?
> 
> Guozhang Wang wrote:
> I would also prefer not to close the Processor thread upon exceptions, 
> mainly for avoid one bad client killing a shared Kafka cluster. In the past 
> we have seen such issues like DDoS MetadataRequest killing the cluster and 
> all other clients gets affected, etc, and the quota work is towards 
> preventing it. Since Processor threads are shared (8 by default on a broker), 
> it should not be closed by a single socket / bad client request.

I like your thinking around cases #1 and #2. I think this should go as a code 
comment somewhere, so when people improve / extend SocketServer they will keep 
this logic in mind. Maybe even specify in specific catch clauses if they are 
handling possible errors in request level or channel level.

My concern is with possible case #3: Each processor has an 
o.a.k.common.network.Selector. I'm concerned about the possibility of something 
going wrong in the state of the selector, which will possibly be an issue for 
all channels. For example failure to register could be an issue with the 
channel.register call, but also perhaps an issue with keys.put (just an example 
- I'm not sure something can actually break keys table). 

I'd like to be able to identify cases where the Selector state may have gone 
wrong and close the processor in that case. Does that make any sense? Or am I 
being too paranoid?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92496
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
> 
> -

Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Gwen Shapira


> On July 22, 2015, 10:40 a.m., Ismael Juma wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 467
> > 
> >
> > As far as I can see `ClosedChannelException`, `IllegalStateException` 
> > and `IllegalArgumentException` are enough? Also, you would it be better to 
> > use `IOException` instead of `ClosedChannelException`?
> > 
> > What happens if other exceptions are thrown? Will we still have a 
> > socket leak?

Yeah, perhaps in addition to listing the expected cases, we should also handle 
nonFatal(e)? (https://tersesystems.com/2012/12/27/error-handling-in-scala/)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 22, 2015, 5:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637219#comment-14637219
 ] 

Yasuhiro Matsuda commented on KAFKA-2350:
-

I think overloading subscribe/unsubscribe is very confusing. 
Subscribe/unsubscribe and pause/unpause are two very different behaviors.  And 
overloading same method names is not really simplifying the API. I want 
pause/unpause to be a pure flow control. It shouldn't be mix up with 
subscription.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92607
---


LGTM overall. Could you address Ismael's comments as well before check-in?

- Guozhang Wang


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 22, 2015, 5:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-2352) Possible memory leak in MirrorMaker and/or new Producer

2015-07-22 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637208#comment-14637208
 ] 

Jiangjie Qin commented on KAFKA-2352:
-

Can you verify whether the messages you passed to producer has been sent or 
not? In mirror maker the default configuration was set to avoid data loss. So 
it producer cannot send a message it will retry infinitely. In that case, you 
might exhaust the producer's buffer if you keep appending messages to producer.

> Possible memory leak in MirrorMaker and/or new Producer
> ---
>
> Key: KAFKA-2352
> URL: https://issues.apache.org/jira/browse/KAFKA-2352
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Kostya Golikov
> Attachments: consumer.conf, output.log, producer.conf
>
>
> I've been playing around with Mirror Maker (version from trunk, dated July 
> 7th) and got a few problems, most noticeable of which is that MirrorMaker 
> exhausts it's memory pool, even though it's size set to relatively huge value 
> of 132 MB, and individual messages are around 2 KB. Batch size is set to just 
> 2 messages (full configs are attached). 
> {code}
> [2015-07-21 15:19:52,915] FATAL [mirrormaker-thread-1] Mirror maker thread 
> failure due to  (kafka.tools.MirrorMaker$MirrorMakerThread)
> org.apache.kafka.clients.producer.BufferExhaustedException: You have 
> exhausted the 134217728 bytes of memory you configured for the client and the 
> client is configured to error rather than block when memory is exhausted.
>   at 
> org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:124)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:172)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:388)
>   at 
> kafka.tools.MirrorMaker$MirrorMakerProducer.send(MirrorMaker.scala:380)
>   at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311)
>   at 
> kafka.tools.MirrorMaker$MirrorMakerThread$$anonfun$run$3.apply(MirrorMaker.scala:311)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:311)
> {code}
> Am I doing something wrong? Any help in further diagnosing of this problem 
> might be handy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Guozhang Wang


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 400
> > 
> >
> > So in case of unexpected exception, we log an error and keep running?
> > 
> > Isn't it better to kill the processor, since we don't know what's the 
> > state of the system? If the acceptor keeps placing messages in the queue 
> > for a dead processor, isn't it a separate issue?
> 
> Jiangjie Qin wrote:
> This part I'm not quite sure. I am not very experienced in the error 
> handling in such case, so please correct me if I missed something.
> Here is what I thought.
> 
> The way it currently works is that the acceptor will 
> 1. accept new connection request and create new socket channel
> 2. choose a processor and put the socket channel into the processor's new 
> connection queue
> 
> The processor will just take the socket channels from the queue and 
> register it to the selector.
> If the processor runs and get an uncaught exception, there are several 
> possibilities. 
> Case 1: The exception was from one socket channel. 
> Case 2: The exception was associated with a bad request. 
> In case 1, ideally we should just disconnect that socket channel without 
> affecting other socket channels.
> In case 2, I think we should log the error and skip the message - 
> assuming client will retry sending data if no response was received for a 
> given peoriod of time.
> 
> I am not sure if letting processor exit is a good idea because this will 
> lead to the result of a badly behaving client screw the entire cluster - it 
> might screw processors one by one. Comparing with that, I kind of leaning 
> towards keeping the processor running and serving other normal TCP 
> connections if possible, but log the error so monitoring system can detect 
> and see if human intervention is needed.
> 
> Also, I don't know what to do here to prevent the thread from exiting 
> without catching all the throwables.
> According to this blog 
> http://www.tzavellas.com/techblog/2010/09/20/catching-throwable-in-scala/
> I guess I can rethrow all the ControlThrowables, but intercept the rests?

I would also prefer not to close the Processor thread upon exceptions, mainly 
for avoid one bad client killing a shared Kafka cluster. In the past we have 
seen such issues like DDoS MetadataRequest killing the cluster and all other 
clients gets affected, etc, and the quota work is towards preventing it. Since 
Processor threads are shared (8 by default on a broker), it should not be 
closed by a single socket / bad client request.


> On July 21, 2015, 11:15 p.m., Gwen Shapira wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 465
> > 
> >
> > Turns out that catching Throwable is a really bad idea: 
> > https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/
> 
> Jiangjie Qin wrote:
> Ah... Didn't know that before. I explicitly listed the exceptions.

Searching ": Throwable" gives me 180+ cases in code base :P Though many of them 
are from unit tests (which, arguably maybe OK) there are still a lot in the 
core package. I agree that we should avoid catching Throwable whenever 
possible, which will also help enforcing the developers to think about possible 
checked exceptions in the calling trace.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92496
---


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 22, 2015, 5:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Comment Edited] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637187#comment-14637187
 ] 

Jason Gustafson edited comment on KAFKA-2350 at 7/22/15 5:01 PM:
-

[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to unpause it. 


was (Author: hachikuji):
[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to pause it. 

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637187#comment-14637187
 ] 

Jason Gustafson commented on KAFKA-2350:


[~becket_qin] [~jkreps] Currently automatic assignment is inferred based on 
which subscribe methods are invoked (e.g. if you subscribe to a topic, then we 
assume you want automatic assignment). I wonder if it might help to make that 
instead an explicit configuration parameter? Then that might free us a little 
bit to use subscribe/unsubscribe in the way [~becket_qin] is proposing since we 
don't have to guess whether the user is intending to actually subscribe to a 
partition or just to pause it. 

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637179#comment-14637179
 ] 

Guozhang Wang commented on KAFKA-2350:
--

Currently there is already a function for retrieving the subscribed topic 
partitions today:

{code}
public Set subscriptions() {
acquire();
try {
return 
Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
} finally {
release();
}
}
{code}

which will for example remove the partition and hence change the returned 
values if consumer.unsubscribe(partition) is called.

I actually think [~becket_qin]'s approach will not cause much confusion 
regarding the APIs. More explicitly assuming we add another function 
"assignment()" that returns you the assigned partitions, the semantics of the 
other APIs will be:

{code}
consumer.subscribe(topic); // will not throw any exception, but will update the 
assignment as well as subscription in the next poll.

consumer.unsubscribe(topic);  // will throw an exception if the topic is not 
subscribed; otherwise will update the assignment and the subscription in the 
next poll.

consumer.assignment(); // return the assigned partitions

consumer.subscriptions();  // return the subscribed partitions, which is the 
same to the assigned partitions most of the time

consumer.subscribe(partition1);  // will throw an exception if partition is not 
in assignment(), saying "it is not assigned to you"

consumer.unsubscribe(partition2);  // will throw an exception if partition is 
not in subscriptions(), saying "it is not subscribed by yourself"
{code}

What I am more concerned about this approach is about the client 
implementation. Since it allows a client to be both using Kafka partition 
assignment and not during its life cycle, this could possibly make the client 
state more complicated to manage. For example:

{code}
consumer.subscribe(topic1); // using kafka for assignment, say we are assigned 
topic1-partition1 and topic1-partition2
consumer.poll();
consumer.subscribe(topic2-partition1); // subscribe to another partition 
explicitly without letting kafka coordinator to be aware of.
consumer.unsubscribe(topic1-partition1);  // now the subscription is 
topic1-partition2 and topic2-partition1, where the first is from Kafka 
assignment and the second is from explicit subscription.
{code}

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36681: Patch for KAFKA-2275

2015-07-22 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36681/#review92604
---


Hey Ashish, this looks pretty good to me. Just some minor comments.


clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 298)


Any reason not to put this method in Fetcher instead of here? I don't have 
a strong feeling, but it was kind of nice keeping ConsumerNetworkClient largely 
free of application logic.

Also, it might be nice to have a unit test.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 314 - 315)


I think I asked this before, but is there any harm in returning this topic 
to the user? I ask because we don't actually prevent them from calling 
partitionsFor with the same topic.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 320)


I think convention is to leave off the braces on one-line if statements.


- Jason Gustafson


On July 22, 2015, 6:32 a.m., Ashish Singh wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36681/
> ---
> 
> (Updated July 22, 2015, 6:32 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2275
> https://issues.apache.org/jira/browse/KAFKA-2275
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2275: Add a ListTopics() API to the new consumer
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> ed4c0d98596cc294757f35df8c8cbc8e36ff42de 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 252b759c0801f392e3526b0f31503b4b8fbf1c8a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> c14eed1e95f2e682a235159a366046f00d1d90d6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 3eb5f95731a3f06f662b334ab2b3d0ad7fa9e1ca 
> 
> Diff: https://reviews.apache.org/r/36681/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ashish Singh
> 
>



[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2015-07-22 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14637126#comment-14637126
 ] 

Jay Kreps commented on KAFKA-2350:
--

[~becket_qin] Cool, we're on the same page, that was how I interpreted what you 
said. There is definitely a sense in which this is more elegant but there is a 
little complexity since you need to keep a list of surprised partitions and 
need to populate that when unsubscribe(partition) is called if that topic is 
subscribed to.

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip poll() or if you unsubscribe, then a 
> rebalance will be triggered and your partitions will be reassigned.
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(String... topics);
> void unpause(String... topics);
> {code}
> When a topic is paused, a call to KafkaConsumer.poll will not initiate any 
> new fetches for that topic. After it is unpaused, fetches will begin again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92599
---



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 46 - 57)


Some of these fields can be `final`.


- Ismael Juma


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java 
> PRE

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92405
---



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java (line 
61)


Is there a JIRA ticket for removing `BlockingChannel`? If so, it may be 
worth mentioning the id here so that it's easy to find when the time comes for 
the removal to be done.



clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java (line 199)


You can use try with resources now that we require Java 7.


- Ismael Juma


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> 3ca0098b8ec8cfdf81158465b2d40afc47eb6f80 
>   
> clients/src/main/java/org/apache/kafka/common/network/Pla

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


> On July 22, 2015, 3 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
> >  lines 231-234
> > 
> >
> > Still some questions on this.
> > 
> > 1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could 
> > have flushed all the bytes. In this case, we should turn off the write 
> > interestOps in the socket key, right?
> > 2. When handshakeStatus is NEED_UNWRAP and write is true, we will move 
> > on to the NEED_UNWRAP case. However, in this case, there may still be 
> > unflushed bytes in netWriteBuffer.
> > 3. When handshakeStatus transitions to FINISHED, we return to the 
> > callers. Doesn't that delay the completion of the handshake since this key 
> > may no longer be selected?

1. handshakeStatus cannot go from NEED_WRAP to FINISHED it can only go to 
NEED_UNWRAP. Either client or server needs to get into NEED_UNWRAP before it 
can reach to FINISHED status.
2. This is not true. If the handshake status need_unwrap and we are checking if 
flush returns true if its not than that means there are unflushed bytes 
netWriteBuffer. we set the interestOps to WRITE and break.
3. I am not sure I follow. If the handshake is finished the read bit is still 
on. So either client needs to start sending some data , at that point they set 
write bit on. Example metadata update request or server starts to respond to 
the client with responses. Handshake happens at the connection in reality 
client is done with handshake and starts sending some requests either its a 
producer or consumer.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/m

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


> On July 22, 2015, 3 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
> >  line 411
> > 
> >
> > Is renegotiation actually supported? It seems that renegotiation can 
> > happen in the middle of a regular send/receive.

Yes it can happen during the middle of regular send/receive. But this need to 
be initiated by server which we don't have any criteria i.e we are checking any 
certificate expiration to trigger renegotiation. As we discussed we are going 
to skip this for the initial version.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
> bae528d31516679bed88ee61b408f209f185a8cc 
>   clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/Authenticator.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> df0e6d5105ca97b7e1cb4d334ffb7b443506bd0b 
>   clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.ja

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Sriharsha Chintalapani


> On July 22, 2015, 3 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
> >  lines 361-374
> > 
> >
> > Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get 
> > into infinite loop here?

Sorry. I thought I mvoed the socketChannel read inside the loop. I missed it.


> On July 22, 2015, 3 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java,
> >  lines 394-397
> > 
> >
> > We talked about that before. It's possible that we read all bytes for 
> > multiple recieves into the appReadBuffer. Selector only reads off one 
> > receive at a time. If there are no more bytes from incoming network, this 
> > key may not be selected to read off the next receive in the appReadBuffer. 
> > How do we resolve that issue?

yes. This is an issue which I am trying to address by adding the stagedReceives 
on Selector side which is to read as many availbale reads as possible in a 
single poll and push these into stagedReceives queue and pop one out during 
that poll add to completedReceives. In other polls if the selectionKey is not 
selected and its not mute and if there is a stagedReceive waiting we will poll 
that and add to completedReceives. But this logic is not working on serverside 
as I get selector.send IllegalStateException. I am still looking into it will 
update the patch once I've a solution. Please do let me know if the above 
description sounds right to you.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


On July 20, 2015, 7 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33620/
> ---
> 
> (Updated July 20, 2015, 7 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1690
> https://issues.apache.org/jira/browse/KAFKA-1690
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Added 
> PrincipalBuilder.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Addressing 
> reviews.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client. Fixed minor 
> issues with the patch.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> KAFKA-1690. new java producer needs ssl support as a client.
> 
> 
> Merge remote-tracking branch 'refs/remotes/origin/trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Broker side ssl changes.
> 
> 
> KAFKA-1684. SSL for socketServer.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest and fixes to get right port for SSL.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Post merge fixes.
> 
> 
> KAFKA-1690. Added SSLProducerSendTest.
> 
> 
> KAFKA-1690. Minor fixes based on patch review comments.
> 
> 
> Merge commit
> 
> 
> KAFKA-1690. Added SSL Consumer Test.
> 
> 
> KAFKA-1690. SSL Support.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> Merge branch 'trunk' into KAFKA-1690-V1
> 
> 
> KAFKA-1690. Addressing reviews. Removed interestOps from SSLTransportLayer.
> 
> 
> KAFKA-1690. Addressing reviews.
> 
> 
> Diffs
> -
> 
>   build.gradle fb9084307ae41bbb62e32720ccd7b94f26e910c8 
>   checkstyle/import-control.xml 19e0659ef9385433d9f94dee43cd70a52b18c9e5 
>   clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
> 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 48fe7961e2215372d8033ece4af739ea06c6457b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 70377ae2fa46deb381139d28590ce6d4115e1adc 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> bea3d737c51be77d5b5293cdd944d33b905422ba 
>   clients/src/main/

Re: Review Request 33620: Patch for KAFKA-1690

2015-07-22 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33620/#review92314
---


Thanks for the patch. Looks good overall. A few comments below.

There are a few places where we still wrap single line statement with {}.


build.gradle (lines 247 - 255)


It seems that you reverted the changes in kafka-2323.



checkstyle/import-control.xml (line 56)


Is this needed? It doesn't seem that network needs to use any protocol.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 47)


Could we add @Override annotation?



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 96)


Could we add @Override annotation? There are a few other methods in this 
class are like that.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 150)


src => srcs



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 183)


Typo Rerturns



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 191)


param is not SelectionKey.



clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java
 (line 200)


param is not SelectionKey.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 87)


Could we add the @Override annotation to this and a few other methods in 
this class?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 231 - 234)


Still some questions on this.

1. When handshakeStatus is not NEED_UNWRAP (e.g. FINISHED), we could have 
flushed all the bytes. In this case, we should turn off the write interestOps 
in the socket key, right?
2. When handshakeStatus is NEED_UNWRAP and write is true, we will move on 
to the NEED_UNWRAP case. However, in this case, there may still be unflushed 
bytes in netWriteBuffer.
3. When handshakeStatus transitions to FINISHED, we return to the callers. 
Doesn't that delay the completion of the handshake since this key may no longer 
be selected?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 245 - 246)


Since we are not making use of appReadBuffer during handshake, could 
OVERFLOW ever happen? If not, perhaps we can add a comment.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 361 - 374)


Hmm, if status is ok and handshakeStatus is NEED_UNWRAP, could we get into 
infinite loop here?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 394 - 397)


We talked about that before. It's possible that we read all bytes for 
multiple recieves into the appReadBuffer. Selector only reads off one receive 
at a time. If there are no more bytes from incoming network, this key may not 
be selected to read off the next receive in the appReadBuffer. How do we 
resolve that issue?



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(line 411)


Is renegotiation actually supported? It seems that renegotiation can happen 
in the middle of a regular send/receive.



clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java 
(lines 601 - 606)


It seems that in general, we expect addInterestOps to be called only if 
handshake has completed. Perhaps we can just throw an IllegalStateException if 
handshake hasn't completed? Ditto to removeInterestOps.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 218 
- 220)


Coding convention: no need to wrap single line statement with {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 274 
- 276)


Coding convention: no need to wrap single line statement with {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java (lines 434 
- 435)


[jira] [Updated] (KAFKA-2345) Attempt to delete a topic already marked for deletion throws ZkNodeExistsException

2015-07-22 Thread Stevo Slavic (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stevo Slavic updated KAFKA-2345:

Affects Version/s: 0.8.2.0

> Attempt to delete a topic already marked for deletion throws 
> ZkNodeExistsException
> --
>
> Key: KAFKA-2345
> URL: https://issues.apache.org/jira/browse/KAFKA-2345
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Fix For: 0.8.3
>
> Attachments: KAFKA-2345.patch, KAFKA-2345_2015-07-17_10:20:55.patch
>
>
> Throwing a TopicAlreadyMarkedForDeletionException will make much more sense. 
> A user does not necessarily have to know about involvement of zk in the 
> process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: ConsumerRecords are organized per topic partit...

2015-07-22 Thread sslavic
GitHub user sslavic opened a pull request:

https://github.com/apache/kafka/pull/92

ConsumerRecords are organized per topic partition

ConsumerRecords has records organized per topic partition, not per topic as 
ConsumerRecords javadoc suggested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sslavic/kafka patch-6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/92.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #92


commit 8dacfda7c71b7acaf49a19db0563975b8a96f7a5
Author: Stevo Slavić 
Date:   2015-07-22T13:41:00Z

ConsumerRecords are organized per topic partition

ConsumerRecords has records organized per topic partition, not per topic as 
ConsumerRecords javadoc suggested.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-2356) Support retrieving partitions of ConsumerRecords

2015-07-22 Thread Stevo Slavic (JIRA)
Stevo Slavic created KAFKA-2356:
---

 Summary: Support retrieving partitions of ConsumerRecords
 Key: KAFKA-2356
 URL: https://issues.apache.org/jira/browse/KAFKA-2356
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.3
Reporter: Stevo Slavic
Priority: Trivial


In new consumer on trunk, ConsumerRecords has method to retrieve records for 
given TopicPartition, but there is no method to retrieve TopicPartition's 
included/available in ConsumerRecords. Please have it supported.

Method could be something like:
{noformat}
/**
 * Get partitions of records returned by a {@link Consumer#poll(long)} operation
*/
public Set partitions() {
return Collections.unmodifiableSet(this.records.keySet());
}
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Grant Henke


> On July 22, 2015, 10:29 a.m., Ismael Juma wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, line 269
> > 
> >
> > Should `verifyTopicDeletion` be inside `finally`? I don't think so 
> > because you don't want to run it if `fail` is called above. You could move 
> > it to be in the `e: TopicAlreadyMarkedForDeletionException` clause maybe?

Just removing the finally block and having it after the catch should work fine 
too.


- Grant


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92571
---


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36670/
> ---
> 
> (Updated July 22, 2015, 1:46 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2355
> https://issues.apache.org/jira/browse/KAFKA-2355
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
> deleted
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
> 
> Diff: https://reviews.apache.org/r/36670/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Edward Ribeiro
> 
>



Re: Should 0.8.3 consumers correctly function with 0.8.2 brokers?

2015-07-22 Thread Stevo Slavić
I'm getting "Unknown api code 11" even when both client and server are
0.8.3/trunk, when "KafkaConsumer.subscribe(String... topics)" is used.

Bug?

Kind regards,
Stevo Slavic.

On Fri, Apr 24, 2015 at 7:13 PM, Neha Narkhede  wrote:

> Yes, I was clearly confused :-)
>
> On Fri, Apr 24, 2015 at 9:37 AM, Sean Lydon  wrote:
>
> > Thanks for the responses. Ewen is correct that I am referring to the
> > *new* consumer (org.apache.kafka.clients.consumer.KafkaConsumer).
> >
> > I am extending the consumer to allow my applications more control over
> > committed offsets.  I really want to get away from zookeeper (so using
> > the offset storage), and re-balancing is something I haven't really
> > needed to tackle in an automated/seamless way.  Either way, I'll hold
> > off going further down this road until there is more interest.
> >
> > @Gwen
> > I set up a single consumer without partition.assignment.strategy or
> > rebalance.callback.class.  I was unable to subscribe to just a topic
> > ("Unknown api code 11" on broker), but I could subscribe to a
> > topicpartition.  This makes sense as I would need to handle re-balance
> > outside the consumer.  Things functioned as expected (well  I have an
> > additional minor fix to code from KAFKA-2121), and the only exceptions
> > on broker were due to closing consumers (which I have become
> > accustomed to).  My tests are specific to my extended version of the
> > consumer, but they basically do a little writing and reading with
> > different serde classes with application controlled commits (similar
> > to onSuccess and onFailure after each record, but with tolerance for
> > out of order acknowledgements).
> >
> > If you are interested, here is the patch of the hack against trunk.
> >
> > On Thu, Apr 23, 2015 at 10:27 PM, Ewen Cheslack-Postava
> >  wrote:
> > > @Neha I think you're mixing up the 0.8.1/0.8.2 updates and the
> > 0.8.2/0.8.3
> > > that's being discussed here?
> > >
> > > I think the original question was about using the *new* consumer
> > ("clients
> > > consumer") with 0.8.2. Gwen's right, it will use features not even
> > > implemented in the broker in trunk yet, let alone the 0.8.2.
> > >
> > > I don't think the "enable.commit.downgrade" type option, or supporting
> > the
> > > old protocol with the new consumer at all, makes much sense. You'd end
> up
> > > with some weird hybrid of simple and high-level consumers -- you could
> > use
> > > offset storage, but you'd have to manage rebalancing yourself since
> none
> > of
> > > the coordinator support would be there.
> > >
> > >
> > > On Thu, Apr 23, 2015 at 9:22 PM, Neha Narkhede 
> > wrote:
> > >
> > >> My understanding is that ideally the 0.8.3 consumer should work with
> an
> > >> 0.8.2 broker if the offset commit config was set to "zookeeper".
> > >>
> > >> The only thing that might not work is offset commit to Kafka, which
> > makes
> > >> sense since the 0.8.2 broker does not support Kafka based offset
> > >> management.
> > >>
> > >> If we broke all kinds of offset commits, then it seems like a
> > regression,
> > >> no?
> > >>
> > >> On Thu, Apr 23, 2015 at 7:26 PM, Gwen Shapira 
> > >> wrote:
> > >>
> > >> > I didn't think 0.8.3 consumer will ever be able to talk to 0.8.2
> > >> > broker... there are some essential pieces that are missing in 0.8.2
> > >> > (Coordinator, Heartbeat, etc).
> > >> > Maybe I'm missing something. It will be nice if this will work :)
> > >> >
> > >> > Mind sharing what / how you tested? Were there no errors in broker
> > >> > logs after your fix?
> > >> >
> > >> > On Thu, Apr 23, 2015 at 5:37 PM, Sean Lydon 
> > >> wrote:
> > >> > > Currently the clients consumer (trunk) sends offset commit
> requests
> > of
> > >> > > version 2.  The 0.8.2 brokers fail to handle this particular
> request
> > >> > > with a:
> > >> > >
> > >> > > java.lang.AssertionError: assertion failed: Version 2 is invalid
> for
> > >> > > OffsetCommitRequest. Valid versions are 0 or 1.
> > >> > >
> > >> > > I was able to make this work via a forceful downgrade of this
> > >> > > particular request, but I would like some feedback on whether a
> > >> > > "enable.commit.downgrade" configuration would be a tolerable
> method
> > to
> > >> > > allow 0.8.3 consumers to interact with 0.8.2 brokers.  I'm also
> > >> > > interested in this even being a goal worth pursuing.
> > >> > >
> > >> > > Thanks,
> > >> > > Sean
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Thanks,
> > >> Neha
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> >
>
>
>
> --
> Thanks,
> Neha
>


Re: Kafka Unit Test Failures on a Mac

2015-07-22 Thread Ismael Juma
On Mon, Jul 20, 2015 at 10:45 PM, Grant Henke  wrote:

> In one run of the core tests I found the following:
>
>- 4584 regular files (REG)
>- 376 .jar files
>  - Not much one can/should do here. Many are from gradle itself.
>   - 2392 kafka .log files
>  - why are these being leaked?
>  - after a single test no file handles should remain
>   - 1162 kafka .log.deleted files
>  - why are these being leaked?
>

Some of those are probably due to:

https://issues.apache.org/jira/browse/KAFKA-1782

Ismael


Re: Review Request 36664: Patch for KAFKA-2353

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36664/#review92574
---



core/src/main/scala/kafka/network/SocketServer.scala (line 401)


Is it intentional to ignore `java.lang.Error` too?



core/src/main/scala/kafka/network/SocketServer.scala (line 463)


As far as I can see `ClosedChannelException`, `IllegalStateException` and 
`IllegalArgumentException` are enough? Also, you would it be better to use 
`IOException` instead of `ClosedChannelException`?

What happens if other exceptions are thrown? Will we still have a socket 
leak?


- Ismael Juma


On July 22, 2015, 5:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36664/
> ---
> 
> (Updated July 22, 2015, 5:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2353
> https://issues.apache.org/jira/browse/KAFKA-2353
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Gwen's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 91319fa010b140cca632e5fa8050509bd2295fc9 
> 
> Diff: https://reviews.apache.org/r/36664/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 36670: Patch for KAFKA-2355

2015-07-22 Thread Ismael Juma

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36670/#review92571
---



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 267)


You should either include information about the exception that was actually 
thrown or let the original exception propagate. Also, if you choose the former, 
you should do `case _: Throwable` to avoid a warning in Scala 2.11.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala (line 269)


Should `verifyTopicDeletion` be inside `finally`? I don't think so because 
you don't want to run it if `fail` is called above. You could move it to be in 
the `e: TopicAlreadyMarkedForDeletionException` clause maybe?


- Ismael Juma


On July 22, 2015, 1:46 a.m., Edward Ribeiro wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36670/
> ---
> 
> (Updated July 22, 2015, 1:46 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2355
> https://issues.apache.org/jira/browse/KAFKA-2355
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-2355 Add an unit test to validate the deletion of a partition marked as 
> deleted
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
> fa8ce259a2832ab86f9dda8c1d409b2c42d43ae9 
> 
> Diff: https://reviews.apache.org/r/36670/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Edward Ribeiro
> 
>



Re: Official Kafka Gitter Room?

2015-07-22 Thread Stevo Slavić
On Apache Mahout project we're using Slack as well - for release
coordination. It was found that extra Slack channel does not really fit
into Apache way - it was overused, there were too many design discussions
going on there, to which community at large has no access to, was not and
could not be involved, cannot even see history. This is not the case with
user/dev mailing list, with searchable archives.

Kind regards,
Stevo Slavic.

On Wed, Jul 22, 2015 at 11:07 AM, Ismael Juma  wrote:

> Hi Gwen,
>
> On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira 
> wrote:
>
> > So, as an experiment, I created:
> > https://apachekafka.slack.com
> >
> > I figured we'll give it a whirl for a week or two for dev discussions,
> > see how it goes and if we have activity we can add this to the website
> > and announce on the lists.
> >
>
> Are people using this? If so, please send me an invite.
>
> Ismael
>


Re: Official Kafka Gitter Room?

2015-07-22 Thread Ismael Juma
Hi Gwen,

On Sun, Jul 19, 2015 at 2:26 AM, Gwen Shapira  wrote:

> So, as an experiment, I created:
> https://apachekafka.slack.com
>
> I figured we'll give it a whirl for a week or two for dev discussions,
> see how it goes and if we have activity we can add this to the website
> and announce on the lists.
>

Are people using this? If so, please send me an invite.

Ismael


[DISCUSS] Partitioning in Kafka

2015-07-22 Thread Gianmarco De Francisci Morales
Hello folks,

I'd like to ask the community about its opinion on the partitioning
functions in Kafka.

With KAFKA-2091 
integrated we are now able to have custom partitioners in the producer.
The question now becomes *which* partitioners should ship with Kafka?
This issue arose in the context of KAFKA-2092
, which implements a
specific load-balanced partitioning. This partitioner however assumes some
stages of processing on top of it to make proper use of the data, i.e., it
envisions Kafka as a substrate for stream processing, and not only as the
I/O component.
Is this a direction that Kafka wants to go towards? Or is this a role
better left to the internal communication systems of other stream
processing engines (e.g., Storm)?
And if the answer is the latter, how would something such a Samza (which
relies mostly on Kafka as its communication substrate) be able to implement
advanced partitioning schemes?

Cheers,
--
Gianmarco