Fwd: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-02-11 Thread Harsha
Hi,
Here is the initial proposal for sasl/kerberos implementation for
kafka https://cwiki.apache.org/confluence/x/YI4WAw
and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
currently working on prototype which will add more details to the KIP. 
Just opening the thread to say the work is in progress. I'll update the
thread with a initial prototype patch.
Thanks,
Harsha


[jira] [Updated] (KAFKA-1866) LogStartOffset gauge throws exceptions after log.delete()

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1866:
--
Attachment: KAFKA-1866_2015-02-11_09:25:33.patch

 LogStartOffset gauge throws exceptions after log.delete()
 -

 Key: KAFKA-1866
 URL: https://issues.apache.org/jira/browse/KAFKA-1866
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1866.patch, KAFKA-1866_2015-02-10_22:50:09.patch, 
 KAFKA-1866_2015-02-11_09:25:33.patch


 The LogStartOffset gauge does logSegments.head.baseOffset, which throws 
 NoSuchElementException on an empty list, which can occur after a delete() of 
 the log. This makes life harder for custom MetricsReporters, since they have 
 to deal with .value() possibly throwing an exception.
 Locally we're dealing with this by having Log.delete() also call removeMetric 
 on all the gauges. That also has the benefit of not having a bunch of metrics 
 floating around for logs that the broker is not actually handling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1757) Can not delete Topic index on Windows

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316454#comment-14316454
 ] 

Sriharsha Chintalapani commented on KAFKA-1757:
---

[~junrao] Can you please review the patch. Thanks.

 Can not delete Topic index on Windows
 -

 Key: KAFKA-1757
 URL: https://issues.apache.org/jira/browse/KAFKA-1757
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.2.0
Reporter: Lukáš Vyhlídka
Assignee: Sriharsha Chintalapani
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-1757.patch, lucky-v.patch


 When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to 
 delete the Topic throwed an error:
 ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; 
 Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; 
 ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] 
 (kafka.server.KafkaApis)
 kafka.common.KafkaStorageException: Delete of index 
 .index failed.
 at kafka.log.LogSegment.delete(LogSegment.scala:283)
 at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
 at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.log.Log.delete(Log.scala:608)
 at kafka.log.LogManager.deleteLog(LogManager.scala:375)
 at 
 kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144)
 at 
 kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
 at 
 kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
 at kafka.cluster.Partition.delete(Partition.scala:139)
 at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158)
 at 
 kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191)
 at 
 kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190)
 at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
 at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190)
 at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
 at java.lang.Thread.run(Thread.java:744)
 When I have investigated the issue I figured out that the index file (in my 
 environment it was 
 C:\tmp\kafka-logs\----0014-0\.index)
  was locked by the kafka process and the OS did not allow to delete that file.
 I tried to fix the problem in source codes and when I added close() method 
 call into LogSegment.delete(), the Topic deletion started to work.
 I will add here (not sure how to upload the file during issue creation) a 
 diff with the changes I have made so You can take a look on that whether it 
 is reasonable or not. It would be perfect if it could make it into the 
 product...
 In the end I would like to say that on Linux the deletion works just fine...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1866) LogStartOffset gauge throws exceptions after log.delete()

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316572#comment-14316572
 ] 

Sriharsha Chintalapani commented on KAFKA-1866:
---

Updated reviewboard https://reviews.apache.org/r/30084/diff/
 against branch origin/trunk

 LogStartOffset gauge throws exceptions after log.delete()
 -

 Key: KAFKA-1866
 URL: https://issues.apache.org/jira/browse/KAFKA-1866
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1866.patch, KAFKA-1866_2015-02-10_22:50:09.patch, 
 KAFKA-1866_2015-02-11_09:25:33.patch


 The LogStartOffset gauge does logSegments.head.baseOffset, which throws 
 NoSuchElementException on an empty list, which can occur after a delete() of 
 the log. This makes life harder for custom MetricsReporters, since they have 
 to deal with .value() possibly throwing an exception.
 Locally we're dealing with this by having Log.delete() also call removeMetric 
 on all the gauges. That also has the benefit of not having a bunch of metrics 
 floating around for logs that the broker is not actually handling.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-11 Thread Joel Koshy
Hi Jay,

 The data channels are actually a big part of the complexity of the zero
 data loss design, though, right? Because then you need some reverse channel
 to flow the acks back to the consumer based on where you are versus just
 acking what you have read and written (as in the code snippet I put up).

I'm not sure if we are on the same page. Even if the data channel was
not there the current handling for zero data loss would remain very
similar - you would need to maintain lists of unacked source offsets.
I'm wondering if the KIP needs more detail on how it is currently
implemented; or are suggesting a different approach (in which case I
have not fully understood). I'm not sure what you mean by flowing acks
back to the consumer - the MM commits offsets after the producer ack
has been received. There is some additional complexity introduced in
reducing duplicates on a rebalance - this is actually optional (since
duplicates are currently a given). The reason that was done anyway is
that with the auto-commit turned off duplicates are almost guaranteed
on a rebalance.

 I think the point that Neha and I were trying to make was that the
 motivation to embed stuff into MM kind of is related to how complex a
 simple consume and produce with good throughput will be. If it is simple
 to write such a thing in a few lines, the pain of embedding a bunch of
 stuff won't be worth it, if it has to be as complex as the current mm then
 of course we will need all kinds of plug ins because no one will be able to
 write such a thing. I don't have a huge concern with a simple plug-in but I
 think if it turns into something more complex with filtering and
 aggregation or whatever we really need to stop and think a bit about the
 design.

I agree - I don't think there is a use-case for any complex plug-in.
It is pretty much what Becket has described currently for the message
handler - i.e., take an incoming record and return a list of outgoing
records (which could be empty if you filter).

So here is my take on the MM:
- Bare bones: simple consumer - producer pairs (0.7 style). This is
  ideal, but does not handle no data loss
- Above plus support no data loss. This actually adds quite a bit of
  complexity.
- Above plus the message handler. This is a trivial addition I think
  that makes the MM usable in a few other mirroring-like applications.

Joel

 On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
 
 
  On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote:
   I think all of us agree that we want to design MirrorMaker for 0 data
  loss.
   With the absence of the data channel, 0 data loss will be much simpler to
   implement.
 
  The data channel is irrelevant to the implementation of zero data
  loss. The complexity in the implementation of no data loss that you
  are seeing in mirror-maker affects all consume-then-produce patterns
  whether or not there is a data channel.  You still need to maintain a
  list of unacked offsets. What I meant earlier is that we can
  brainstorm completely different approaches to supporting no data loss,
  but the current implementation is the only solution we are aware of.
 
  
   My arguments for adding a message handler are that:
1. It is more efficient to do something in common for all the clients
  in
pipeline than letting each client do the same thing for many times. And
there are concrete use cases for the message handler already.
   
  
   What are the concrete use cases?
 
  I think Becket already described a couple of use cases earlier in the
  thread.
 
  quote
 
  1. Format conversion. We have a use case where clients of source
  cluster
  use an internal schema and clients of target cluster use a different
  public schema.
  2. Message filtering: For the messages published to source cluster,
  there
  are some messages private to source cluster clients and should not
  exposed
  to target cluster clients. It would be difficult to publish those
  messages
  into different partitions because they need to be ordered.
  I agree that we can always filter/convert messages after they are
  copied
  to the target cluster, but that costs network bandwidth unnecessarily,
  especially if that is a cross colo mirror. With the handler, we can
  co-locate the mirror maker with source cluster and save that cost.
  Also,
  imagine there are many downstream consumers consuming from the target
  cluster, filtering/reformatting the messages before the messages reach
  the
  target cluster is much more efficient than having each of the
  consumers do
  this individually on their own.
 
  /quote
 
  
   Also the KIP still refers to the data channel in a few places (Motivation
   and On consumer rebalance sections). Can you update the wiki so it is
   easier to review the new design, especially the data loss part.
  
  
   On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
I think the message handler adds little to no 

[DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-02-11 Thread Harsha
Hi,
Here is the initial proposal for sasl/kerberos implementation for
kafka https://cwiki.apache.org/confluence/x/YI4WAw
and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
currently working on prototype which will add more details to the KIP. 
Just opening the thread to say the work is in progress. I'll update the
thread with a initial prototype patch.
Thanks,
Harsha


[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316455#comment-14316455
 ] 

Sriharsha Chintalapani commented on KAFKA-1566:
---

[~jkreps] [~nehanarkhede] Can you please review this.  kafka-env.sh will allow 
the flexibility of defining a custom java_home for the users.

 Kafka environment configuration (kafka-env.sh)
 --

 Key: KAFKA-1566
 URL: https://issues.apache.org/jira/browse/KAFKA-1566
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Cosmin Lehene
Assignee: Sriharsha Chintalapani
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1566.patch


 It would be useful (especially for automated deployments) to have an 
 environment configuration file that could be sourced from the launcher files 
 (e.g. kafka-run-server.sh). 
 This is how this could look like kafka-env.sh 
 {code}
 export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops 
 -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
 -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
 -XX:InitiatingHeapOccupancyPercent=35' % 
 export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % 
 export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka 
 {code} 
 kafka-server-start.sh 
 {code} 
 ... 
 source $base_dir/config/kafka-env.sh 
 ... 
 {code} 
 This approach is consistent with Hadoop and HBase. However the idea here is 
 to be able to set these values in a single place without having to edit 
 startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1852) OffsetCommitRequest can commit offset on unknown topic

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316447#comment-14316447
 ] 

Sriharsha Chintalapani commented on KAFKA-1852:
---

[~jjkoshy] pinging for a review. Thanks.

 OffsetCommitRequest can commit offset on unknown topic
 --

 Key: KAFKA-1852
 URL: https://issues.apache.org/jira/browse/KAFKA-1852
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1852.patch, KAFKA-1852_2015-01-19_10:44:01.patch


 Currently, we allow an offset to be committed to Kafka, even when the 
 topic/partition for the offset doesn't exist. We probably should disallow 
 that and send an error back in that case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics

2015-02-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316526#comment-14316526
 ] 

Jay Kreps commented on KAFKA-1374:
--

Hey guys, the test kafka.tools.TestLogCleaning is a very aggressive test that 
runs against a kafka cluster configured for log compaction. It produces a bunch 
of messages and compacts them continuously and then does an out of band 
comparison of the two. It would be good to ensure that stills works on really 
large cleaner runs with deletes with this patch.

 LogCleaner (compaction) does not support compressed topics
 --

 Key: KAFKA-1374
 URL: https://issues.apache.org/jira/browse/KAFKA-1374
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, 
 KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, 
 KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, 
 KAFKA-1374_2015-01-18_00:19:21.patch


 This is a known issue, but opening a ticket to track.
 If you try to compact a topic that has compressed messages you will run into
 various exceptions - typically because during iteration we advance the
 position based on the decompressed size of the message. I have a bunch of
 stack traces, but it should be straightforward to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-11 Thread Jay Kreps
Guozhang, I agree with 1-3, I do think what I was proposing was simpler but
perhaps there are gaps in that?

Hey Joel--Here was a sketch of what I was proposing. I do think this get's
rid of manual offset tracking, especially doing so across threads with
dedicated commit threads, which I think is pretty complex.

while(true) {
val recs = consumer.poll(Long.MaxValue);
for (rec - recs)
producer.send(rec, logErrorCallback)
if(System.currentTimeMillis - lastCommit  commitInterval) {
producer.flush()
consumer.commit()
lastCommit = System.currentTimeMillis
}
}

(See the previous email for details). I think the question is: is there any
reason--performance, correctness, etc--that this won't work? Basically I
think you guys have thought about this more so I may be missing something.
If so let's flag it while we still have leeway on the consumer.

If we think that will work, well I do think it is conceptually a lot
simpler than the current code, though I suppose one could disagree on that.

-Jay

On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Hi Jay,

  The data channels are actually a big part of the complexity of the zero
  data loss design, though, right? Because then you need some reverse
 channel
  to flow the acks back to the consumer based on where you are versus just
  acking what you have read and written (as in the code snippet I put up).

 I'm not sure if we are on the same page. Even if the data channel was
 not there the current handling for zero data loss would remain very
 similar - you would need to maintain lists of unacked source offsets.
 I'm wondering if the KIP needs more detail on how it is currently
 implemented; or are suggesting a different approach (in which case I
 have not fully understood). I'm not sure what you mean by flowing acks
 back to the consumer - the MM commits offsets after the producer ack
 has been received. There is some additional complexity introduced in
 reducing duplicates on a rebalance - this is actually optional (since
 duplicates are currently a given). The reason that was done anyway is
 that with the auto-commit turned off duplicates are almost guaranteed
 on a rebalance.

  I think the point that Neha and I were trying to make was that the
  motivation to embed stuff into MM kind of is related to how complex a
  simple consume and produce with good throughput will be. If it is
 simple
  to write such a thing in a few lines, the pain of embedding a bunch of
  stuff won't be worth it, if it has to be as complex as the current mm
 then
  of course we will need all kinds of plug ins because no one will be able
 to
  write such a thing. I don't have a huge concern with a simple plug-in
 but I
  think if it turns into something more complex with filtering and
  aggregation or whatever we really need to stop and think a bit about the
  design.

 I agree - I don't think there is a use-case for any complex plug-in.
 It is pretty much what Becket has described currently for the message
 handler - i.e., take an incoming record and return a list of outgoing
 records (which could be empty if you filter).

 So here is my take on the MM:
 - Bare bones: simple consumer - producer pairs (0.7 style). This is
   ideal, but does not handle no data loss
 - Above plus support no data loss. This actually adds quite a bit of
   complexity.
 - Above plus the message handler. This is a trivial addition I think
   that makes the MM usable in a few other mirroring-like applications.

 Joel

  On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
  
  
   On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote:
I think all of us agree that we want to design MirrorMaker for 0 data
   loss.
With the absence of the data channel, 0 data loss will be much
 simpler to
implement.
  
   The data channel is irrelevant to the implementation of zero data
   loss. The complexity in the implementation of no data loss that you
   are seeing in mirror-maker affects all consume-then-produce patterns
   whether or not there is a data channel.  You still need to maintain a
   list of unacked offsets. What I meant earlier is that we can
   brainstorm completely different approaches to supporting no data loss,
   but the current implementation is the only solution we are aware of.
  
   
My arguments for adding a message handler are that:
 1. It is more efficient to do something in common for all the
 clients
   in
 pipeline than letting each client do the same thing for many
 times. And
 there are concrete use cases for the message handler already.

   
What are the concrete use cases?
  
   I think Becket already described a couple of use cases earlier in the
   thread.
  
   quote
  
   1. Format conversion. We have a use case where clients of source
   cluster
   use an internal schema and clients of target cluster use a different
   public schema.
   2. Message 

Re: Review Request 30084: Patch for KAFKA-1866

2015-02-11 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30084/
---

(Updated Feb. 11, 2015, 5:25 p.m.)


Review request for kafka.


Bugs: KAFKA-1866
https://issues.apache.org/jira/browse/KAFKA-1866


Repository: kafka


Description
---

KAFKA-1866. LogStartOffset gauge throws exceptions after log.delete().


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
  core/src/main/scala/kafka/log/Log.scala 
846023bb98d0fa0603016466360c97071ac935ea 
  core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
3cf23b3d6d4460535b90cfb36281714788fc681c 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30084/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1852) OffsetCommitRequest can commit offset on unknown topic

2015-02-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316739#comment-14316739
 ] 

Joel Koshy commented on KAFKA-1852:
---

Thanks for the ping - will take a look.

 OffsetCommitRequest can commit offset on unknown topic
 --

 Key: KAFKA-1852
 URL: https://issues.apache.org/jira/browse/KAFKA-1852
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.3
Reporter: Jun Rao
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1852.patch, KAFKA-1852_2015-01-19_10:44:01.patch


 Currently, we allow an offset to be committed to Kafka, even when the 
 topic/partition for the offset doesn't exist. We probably should disallow 
 that and send an error back in that case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1943) Producer request failure rate should not include MessageSetSizeTooLarge and MessageSizeTooLargeException

2015-02-11 Thread Aditya Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Auradkar updated KAFKA-1943:
---
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-1946

 Producer request failure rate should not include MessageSetSizeTooLarge and 
 MessageSizeTooLargeException
 

 Key: KAFKA-1943
 URL: https://issues.apache.org/jira/browse/KAFKA-1943
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya A Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-1943.patch


 If MessageSetSizeTooLargeException or MessageSizeTooLargeException is thrown 
 from Log, then ReplicaManager counts it as a failed produce request. My 
 understanding is that this metric should only count failures as a result of 
 broker issues and not bad requests sent by the clients.
 If the message or message set is too large, then it is a client side error 
 and should not be reported. (similar to NotLeaderForPartitionException, 
 UnknownTopicOrPartitionException).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-11 Thread Chi Hoang
For the Sample usage section, please consider
https://github.com/airbnb/kafkat.  We find that tool to be very easy to
use, and extremely useful for our administration tasks.

Chi

On Mon, Feb 9, 2015 at 9:03 AM, Guozhang Wang wangg...@gmail.com wrote:

 I feel the benefits of lowering the development bar for new clients does
 not worth the complexity we need to introduce in the server side, as today
 the clients just need one more request type (metadata request) to send the
 produce / fetch to the right brokers, whereas re-routing mechanism will
 result in complicated between-brokers communication patterns that
 potentially impact Kafka performance and making debugging / trouble
 shooting much harder.

 An alternative way to ease the development of the clients is to use a proxy
 in front of the kafka servers, like the rest proxy we have built before,
 which we use for non-java clients primarily but also can be treated as
 handling cluster metadata discovery for clients. Comparing to the
 re-routing idea, the proxy also introduces two-hops but its layered
 architecture is simpler.

 Guozhang


 On Sun, Feb 8, 2015 at 8:00 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Jiangjie,
 
  Re routing support doesn't force clients to use it. Java and all existing
  clients would work as now where request are intelligently routed by the
  client, but this would lower the bar for new clients. That said I agree
 the
  case for reroute get admin commands is much stronger than data.
 
  The idea of separating admin/metadata from would definitely solve some
  problems but it would also add a lot of complexity--new ports, thread
  pools, etc. this is an interesting idea to think over but I'm not sure if
  it's worth it. Probably a separate effort in any case.
 
  -jay
 
  On Friday, February 6, 2015, Jiangjie Qin j...@linkedin.com.invalid
  wrote:
 
   I¹m a little bit concerned about the request routers among brokers.
   Typically we have a dominant percentage of produce and fetch
   request/response. Routing them from one broker to another seems not
  wanted.
   Also I think we generally have two types of requests/responses: data
   related and admin related. It is typically a good practice to separate
   data plain from control plain. That suggests we should have another
 admin
   port to serve those admin requests and probably have different
   authentication/authorization from the data port.
  
   Jiangjie (Becket) Qin
  
   On 2/6/15, 11:18 AM, Joe Stein joe.st...@stealth.ly wrote:
  
   I updated the installation and sample usage for the existing patches
 on
   the
   KIP site
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and
   +centralized+administrative+operations
   
   There are still a few pending items here.
   
   1) There was already some discussion about using the Broker that is
 the
   Controller here https://issues.apache.org/jira/browse/KAFKA-1772 and
 we
   should elaborate on that more in the thread or agree we are ok with
  admin
   asking for the controller to talk to and then just sending that broker
  the
   admin tasks.
   
   2) I like this idea https://issues.apache.org/jira/browse/KAFKA-1912
  but
   we
   can refactor after KAFK-1694 committed, no? I know folks just want to
  talk
   to the broker that is the controller. It may even become useful to
 have
   the
   controller run on a broker that isn't even a topic broker anymore
 (small
   can of worms I am opening here but it elaborates on Guozhang's hot
 spot
   point.
   
   3) anymore feedback?
   
   - Joe Stein
   
   On Fri, Jan 23, 2015 at 3:15 PM, Guozhang Wang wangg...@gmail.com
   wrote:
   
A centralized admin operation protocol would be very useful.
   
One more general comment here is that controller is originally
  designed
   to
only talk to other brokers through ControllerChannel, while the
 broker
instance which carries the current controller is agnostic of its
   existence,
and use KafkaApis to handle general Kafka requests. Having all admin
requests redirected to the controller instance will force the broker
  to
   be
aware of its carried controller, and access its internal data for
   handling
these requests. Plus with the number of clients out of Kafka's
  control,
this may easily cause the controller to be a hot spot in terms of
   request
load.
   
   
On Thu, Jan 22, 2015 at 10:09 PM, Joe Stein joe.st...@stealth.ly
   wrote:
   
 inline

 On Thu, Jan 22, 2015 at 11:59 PM, Jay Kreps jay.kr...@gmail.com
   wrote:

  Hey Joe,
 
  This is great. A few comments on KIP-4
 
  1. This is much needed functionality, but there are a lot of the
  so
let's
  really think these protocols through. We really want to end up
  with
   a
set
  of well thought-out, orthoganol apis. For this reason I think it
  is
 really
  important to think through the end state even if that includes
  APIs
   we

[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-02-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316747#comment-14316747
 ] 

Jay Kreps commented on KAFKA-1646:
--

Hey [~waldenchen] yeah I think what you are saying is that the offset in the 
recovery point checkpoint should always be the last sync'd offset irrespective 
of segment file boundaries and we should optimize recovery to just recovery 
from that offset rather than always recovering the full last segment. That 
would work, and actually makes more sense than the current approach, but is a 
fairly involved and correctness critical change. One of the challenges is the 
offset index needs to be reconstructed from that point on as well, but there is 
a bit of a chicken and egg problem, because how do you search into the log at 
all if the index itself is corrupt?

Another approach could be to truncate off the preallocated file extent on clean 
shutdown. This is actually effectively what we do for the offset indexes 
anyway. This would also avoid windows specific code since we could do this in 
all cases.

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1946) Improve BrokerTopicMetrics reporting

2015-02-11 Thread Aditya Auradkar (JIRA)
Aditya Auradkar created KAFKA-1946:
--

 Summary: Improve BrokerTopicMetrics reporting
 Key: KAFKA-1946
 URL: https://issues.apache.org/jira/browse/KAFKA-1946
 Project: Kafka
  Issue Type: Improvement
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar


Creating an umbrella ticket to track improvement of BrokerTopicMetrics 
reporting. 
Some of the tasks are:
- Add a metric for total fetch/produce requests as opposed to simply failure 
counts
- Tracking offset commit requests separately from produce requests
- Adding a metric to track bad requests from clients. (HTTP 4XX vs 5XX as an 
example).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-02-11 Thread Joe Stein
Thanks Harsha, looks good so far. How were you thinking of running
the KerberosTicketManager as a standalone process or like controller or is
it a layer of code that does the plumbing pieces everywhere?

~ Joestein

On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:

 Hi,
 Here is the initial proposal for sasl/kerberos implementation for
 kafka https://cwiki.apache.org/confluence/x/YI4WAw
 and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
 currently working on prototype which will add more details to the KIP.
 Just opening the thread to say the work is in progress. I'll update the
 thread with a initial prototype patch.
 Thanks,
 Harsha



[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics

2015-02-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316735#comment-14316735
 ] 

Jay Kreps commented on KAFKA-1374:
--

Great!

 LogCleaner (compaction) does not support compressed topics
 --

 Key: KAFKA-1374
 URL: https://issues.apache.org/jira/browse/KAFKA-1374
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, 
 KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, 
 KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, 
 KAFKA-1374_2015-01-18_00:19:21.patch


 This is a known issue, but opening a ticket to track.
 If you try to compact a topic that has compressed messages you will run into
 various exceptions - typically because during iteration we advance the
 position based on the decompressed size of the message. I have a bunch of
 stack traces, but it should be straightforward to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1914) Count TotalProduceRequestRate and TotalFetchRequestRate in BrokerTopicMetrics

2015-02-11 Thread Aditya Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Auradkar updated KAFKA-1914:
---
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-1946

 Count TotalProduceRequestRate and TotalFetchRequestRate in BrokerTopicMetrics
 -

 Key: KAFKA-1914
 URL: https://issues.apache.org/jira/browse/KAFKA-1914
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Aditya A Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-1914.patch


 Currently the BrokerTopicMetrics only counts the failedProduceRequestRate and 
 the failedFetchRequestRate. We should add 2 metrics to count the overall 
 produce/fetch request rates.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2015-02-11 Thread Helena Edelson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316760#comment-14316760
 ] 

Helena Edelson commented on KAFKA-1419:
---

This ticket says the cross build is available for kafka v 0.8.1.2 with Scala 
2.11 but I don't see that artifact anywhere?

 cross build for scala 2.11
 --

 Key: KAFKA-1419
 URL: https://issues.apache.org/jira/browse/KAFKA-1419
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.1
Reporter: Scott Clasen
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.1.2, 0.8.2.0

 Attachments: KAFKA-1419-scalaBinaryVersion.patch, 
 KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, 
 KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, 
 KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, 
 KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, 
 KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch


 Please publish builds for scala 2.11, hopefully just needs a small tweak to 
 the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-02-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316759#comment-14316759
 ] 

Jay Kreps commented on KAFKA-1926:
--

At a high level this looks good. The main thrust of the ticket had actually 
been to clean up the class Utils.scala and migrate a lot of that to Utils.java, 
but this mostly actually cleans up other utils packages which is also good and 
needed.

A few minor comments:
1. The time constants should probably move into Time.java as static final 
variables, right?
2. If Time moves we may need to move Scheduler, KafkaScheduler, and 
MockScheduler as they are intertwined.
3. SystemTime.java should probably deprecate SystemTime.scala, right?
4. Time.scala should be deleted?


 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-02-11 Thread Tong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316830#comment-14316830
 ] 

Tong Li commented on KAFKA-1926:


[~jkreps] Jay, really appreciate your quick review comments. I will see what I 
can do and will submit another patch based on trunk branch. New patch set will 
come up real soon. Thanks so much.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Jun Rao
+1 for proposed changes in 1 and 2.

1. The impact is that if someone uses SimpleConsumer and references Broker
explicitly, the application needs code change to compile with 0.8.3. Since
SimpleConsumer is not widely used, breaking the API in SimpleConsumer but
maintaining overall code cleanness seems to be a better tradeoff.

2. For clarification, the issue is the following. In 0.8.3, we will be
evolving the wire protocol of UpdateMedataRequest (to send info about
endpoints for different security protocols). Since this is used in
intra-cluster communication, we need to do the upgrade in two steps. The
idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When
upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step
1, all brokers will be capable for processing the new protocol in 0.8.3,
but without actually using it. In step 2, we
configure wire.protocol.version to 0.8.3 in each broker and do another
rolling restart. After step 2, all brokers will start using the new
protocol in 0.8.3. Let's say that in the next release 0.9, we are changing
the intra-cluster wire protocol again. We will do the similar thing:
defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade
from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2
to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2
first and then do the two-step upgrade to 0.9.

Gwen,

In KIP2, there is still a reference to use.new.protocol. This needs to be
removed. Also, would it be better to use intra.cluster.wire.protocol.version
since this only applies to the wire protocol among brokers?

Others,

The patch in KAFKA-1809 is almost ready. It would be good to wrap up the
discussion on KIP2 soon. So, if you haven't looked at this KIP, please take
a look and send your comments.

Thanks,

Jun


On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi Kafka Devs,

 While reviewing the patch for KAFKA-1809, we came across two questions
 that we are interested in hearing the community out on.

 1. This patch changes the Broker class and adds a new class
 BrokerEndPoint that behaves like the previous broker.

 While technically kafka.cluster.Broker is not part of the public API,
 it is returned by javaapi, used with the SimpleConsumer.

 Getting replicas from PartitionMetadata will now return BrokerEndPoint
 instead of Broker. All method calls remain the same, but since we
 return a new type, we break the API.

 Note that this breakage does not prevent upgrades - existing
 SimpleConsumers will continue working (because we are
 wire-compatible).
 The only thing that won't work is building SimpleConsumers with
 dependency on Kafka versions higher than 0.8.2. Arguably, we don't
 want anyone to do it anyway :)

 So:
 Do we state that the highest release on which SimpleConsumers can
 depend is 0.8.2? Or shall we keep Broker as is and create an
 UberBroker which will contain multiple brokers as its endpoints?

 2.
 The KIP suggests use.new.wire.protocol configuration to decide which
 protocols the brokers will use to talk to each other. The problem is
 that after the next upgrade, the wire protocol is no longer new, so
 we'll have to reset it to false for the following upgrade, then change
 to true again... and upgrading more than a single version will be
 impossible.
 Bad idea :)

 As an alternative, we can have a property for each version and set one
 of them to true. Or (simple, I think) have wire.protocol.version
 property and accept version numbers (0.8.2, 0.8.3, 0.9) as values.

 Please share your thoughts :)

 Gwen



[jira] [Updated] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version

2015-02-11 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-1938:
---
Attachment: remove-081-references.patch

Attaching a patch which removes 0.8.1 references form 0.8.2 docs.

 [doc] Quick start example should reference appropriate Kafka version
 

 Key: KAFKA-1938
 URL: https://issues.apache.org/jira/browse/KAFKA-1938
 Project: Kafka
  Issue Type: Improvement
  Components: website
Affects Versions: 0.8.2.0
Reporter: Stevo Slavic
Priority: Trivial
 Attachments: remove-081-references.patch


 Kafka 0.8.2.0 documentation, quick start example on 
 https://kafka.apache.org/documentation.html#quickstart in step 1 links and 
 instructs reader to download Kafka 0.8.1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics

2015-02-11 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316691#comment-14316691
 ] 

Manikumar Reddy commented on KAFKA-1374:


yes, we are using TestLogCleaning  tool to test the changes. 

TestLogCleaning stress test output for compressed messages

{code}
Producing 10 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-6014466306002699464.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-177538909590644701.txt
10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
De-duplicating and validating output files...
Validated 9005 values, 0 mismatches.

Producing 100 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3298578695475992991.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 1000 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3336255463347572934.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
1000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.
{code}

 LogCleaner (compaction) does not support compressed topics
 --

 Key: KAFKA-1374
 URL: https://issues.apache.org/jira/browse/KAFKA-1374
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, 
 KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, 
 KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, 
 KAFKA-1374_2015-01-18_00:19:21.patch


 This is a known issue, but opening a ticket to track.
 If you try to compact a topic that has compressed messages you will run into
 various exceptions - typically because during iteration we advance the
 position based on the decompressed size of the message. I have a bunch of
 stack traces, but it should be straightforward to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1887) controller error message on shutting the last broker

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-1887:
-

Assignee: Sriharsha Chintalapani

 controller error message on shutting the last broker
 

 Key: KAFKA-1887
 URL: https://issues.apache.org/jira/browse/KAFKA-1887
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Sriharsha Chintalapani
Priority: Minor
 Fix For: 0.8.3


 We always see the following error in state-change log on shutting down the 
 last broker.
 [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change 
 for partition [test,0] from OfflinePartition to OnlinePartition failed 
 (state.change.logger)
 kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
 alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
 at 
 kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
 at 
 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357)
 at 
 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206)
 at 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
 at 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
 at 
 kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1920) Add a metric to count client side errors in BrokerTopicMetrics

2015-02-11 Thread Aditya Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Auradkar updated KAFKA-1920:
---
Issue Type: Sub-task  (was: Improvement)
Parent: KAFKA-1946

 Add a metric to count client side errors in BrokerTopicMetrics
 --

 Key: KAFKA-1920
 URL: https://issues.apache.org/jira/browse/KAFKA-1920
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar

 Currently the BrokerTopicMetrics count only failures across all topics and 
 for individual topics. Should we consider adding a metric to count the number 
 of client side errors?
 This essentially counts the number of bad requests per topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics

2015-02-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316714#comment-14316714
 ] 

Joel Koshy commented on KAFKA-1374:
---

I can review this next week. However, as far as checking in is concerned I 
would strongly prefer to get KAFKA-1755 done first (for which I have a patch 
almost ready). The reason for that is that this patch is a significant change 
to the log cleaner and I would rather get some defensive code in first since 
the log cleaner health is critical for offset management as well as for Samza 
use-cases.

 LogCleaner (compaction) does not support compressed topics
 --

 Key: KAFKA-1374
 URL: https://issues.apache.org/jira/browse/KAFKA-1374
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, 
 KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, 
 KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, 
 KAFKA-1374_2015-01-18_00:19:21.patch


 This is a known issue, but opening a ticket to track.
 If you try to compact a topic that has compressed messages you will run into
 various exceptions - typically because during iteration we advance the
 position based on the decompressed size of the message. I have a bunch of
 stack traces, but it should be straightforward to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1936) Track offset commit requests separately from produce requests

2015-02-11 Thread Aditya Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya Auradkar updated KAFKA-1936:
---
Issue Type: Sub-task  (was: Bug)
Parent: KAFKA-1946

 Track offset commit requests separately from produce requests
 -

 Key: KAFKA-1936
 URL: https://issues.apache.org/jira/browse/KAFKA-1936
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar

 In ReplicaManager, failed and total produce requests are updated from 
 appendToLocalLog. Since offset commit requests also follow the same path, 
 they are counted along with produce requests. Add a metric and count them 
 separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1945) MetaData Response - Broker hostname is wrong

2015-02-11 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-1945.
---
Resolution: Invalid

 MetaData Response - Broker hostname is wrong
 

 Key: KAFKA-1945
 URL: https://issues.apache.org/jira/browse/KAFKA-1945
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: saravana kumar

 I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. 
 Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, 
 SimpleConsumer from another machine requests for topic metadata from the 
 broker BROKER_HOST for a topic TOPIC  gets a python tuple
(Broker metadata, Topic metadata)
 Broker metadata comes as,
  {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)}
 ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty 
 confirms it) but it comes as localhost...
 How does the wrong broker metadata for a topic get into kafka system? And 
 obviously, this breaks the system since my consumer tries to connect to 9092 
 on its localhost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Build failed in Jenkins: Kafka-trunk #389

2015-02-11 Thread Joe Stein
I ran into this issue locally too. I left it running overnight and it was
stuck when I got back to it.

This is where I had to kill the terminal at this morning.

kafka.admin.TopicCommandTest 
testConfigPreservationAcrossPartitionAlteration PASSED
kafka.api.ApiUtilsTest  testShortStringASCII PASSED
kafka.api.ApiUtilsTest  testShortStringNonASCII PASSED
 Building 56%  :core:test  36 tests completed

I ran it again just now and the tests passed fine.

~ Joestein

On Mon, Feb 9, 2015 at 8:08 PM, Apache Jenkins Server 
jenk...@builds.apache.org wrote:

 See https://builds.apache.org/job/Kafka-trunk/389/changes

 Changes:

 [wangguoz] KAFKA-1333; Add the consumer coordinator to server; reviewed by
 Onur Karaman and Jay Kreps

 [wangguoz] KAFKA-1333 follow-up; Add missing files for the coordinator
 folder

 --
 [...truncated 1789 lines...]
 kafka.producer.AsyncProducerTest  testProducerQueueSize PASSED

 kafka.producer.AsyncProducerTest  testProduceAfterClosed PASSED

 kafka.producer.AsyncProducerTest  testBatchSize PASSED

 kafka.producer.AsyncProducerTest  testQueueTimeExpired PASSED

 kafka.producer.AsyncProducerTest  testPartitionAndCollateEvents PASSED

 kafka.producer.AsyncProducerTest  testSerializeEvents PASSED

 kafka.producer.AsyncProducerTest  testInvalidPartition PASSED

 kafka.producer.AsyncProducerTest  testNoBroker PASSED

 kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED

 kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED

 kafka.producer.AsyncProducerTest  testFailedSendRetryLogic PASSED

 kafka.producer.AsyncProducerTest  testJavaProducer PASSED

 kafka.producer.AsyncProducerTest  testInvalidConfiguration PASSED

 kafka.log.CleanerTest  testCleanSegments PASSED

 kafka.log.CleanerTest  testCleaningWithDeletes PASSED

 kafka.log.CleanerTest  testCleanSegmentsWithAbort PASSED

 kafka.log.CleanerTest  testSegmentGrouping PASSED

 kafka.log.CleanerTest  testBuildOffsetMap PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[0] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[1] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[2] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[3] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[4] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[5] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[6] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[7] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[8] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[9] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[10] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[11] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[12] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[13] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[14] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[15] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[16] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[17] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[18] PASSED

 kafka.log.BrokerCompressionTest  testBrokerSideCompression[19] PASSED

 kafka.log.LogManagerTest  testCreateLog PASSED

 kafka.log.LogManagerTest  testGetNonExistentLog PASSED

 kafka.log.LogManagerTest  testCleanupExpiredSegments PASSED

 kafka.log.LogManagerTest  testCleanupSegmentsToMaintainSize PASSED

 kafka.log.LogManagerTest  testTimeBasedFlush PASSED

 kafka.log.LogManagerTest  testLeastLoadedAssignment PASSED

 kafka.log.LogManagerTest  testTwoLogManagersUsingSameDirFails PASSED

 kafka.log.LogManagerTest  testCheckpointRecoveryPoints PASSED

 kafka.log.LogManagerTest  testRecoveryDirectoryMappingWithTrailingSlash
 PASSED

 kafka.log.LogManagerTest 
 testRecoveryDirectoryMappingWithRelativeDirectory PASSED

 kafka.log.LogConfigTest  testFromPropsDefaults PASSED

 kafka.log.LogConfigTest  testFromPropsEmpty PASSED

 kafka.log.LogConfigTest  testFromPropsToProps PASSED

 kafka.log.LogConfigTest  testFromPropsInvalid PASSED

 kafka.log.OffsetIndexTest  truncate PASSED

 kafka.log.OffsetIndexTest  randomLookupTest PASSED

 kafka.log.OffsetIndexTest  lookupExtremeCases PASSED

 kafka.log.OffsetIndexTest  appendTooMany PASSED

 kafka.log.OffsetIndexTest  appendOutOfOrder PASSED

 kafka.log.OffsetIndexTest  testReopen PASSED

 kafka.log.FileMessageSetTest  testWrittenEqualsRead PASSED

 kafka.log.FileMessageSetTest  testIteratorIsConsistent PASSED

 kafka.log.FileMessageSetTest  testSizeInBytes PASSED

 kafka.log.FileMessageSetTest  testWriteTo PASSED

 kafka.log.FileMessageSetTest  testFileSize PASSED

 kafka.log.FileMessageSetTest  testIterationOverPartialAndTruncation PASSED

 kafka.log.FileMessageSetTest  

[jira] [Commented] (KAFKA-1944) Rename LogCleaner and related classes to LogCompactor

2015-02-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316222#comment-14316222
 ] 

Joel Koshy commented on KAFKA-1944:
---

Sure - I think this can wait until that is in.

 Rename LogCleaner and related classes to LogCompactor
 -

 Key: KAFKA-1944
 URL: https://issues.apache.org/jira/browse/KAFKA-1944
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Ashish Kumar Singh
  Labels: newbie

 Following a mailing list discussion:
 the name LogCleaner is seriously misleading. Its more of a log compactor. 
 Deleting old logs happens elsewhere from what I've seen.
 Note that this may require renaming related classes, objects, configs and 
 metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-02-11 Thread Tong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316380#comment-14316380
 ] 

Tong Li commented on KAFKA-1926:


[~harsha_ch]Yeah, will do that. Thanks.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-02-11 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316319#comment-14316319
 ] 

Sriharsha Chintalapani commented on KAFKA-1926:
---

[~tongli] you should make patch against trunk.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1944) Rename LogCleaner and related classes to LogCompactor

2015-02-11 Thread Ashish Kumar Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14316951#comment-14316951
 ] 

Ashish Kumar Singh commented on KAFKA-1944:
---

Sure.

 Rename LogCleaner and related classes to LogCompactor
 -

 Key: KAFKA-1944
 URL: https://issues.apache.org/jira/browse/KAFKA-1944
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Ashish Kumar Singh
  Labels: newbie

 Following a mailing list discussion:
 the name LogCleaner is seriously misleading. Its more of a log compactor. 
 Deleting old logs happens elsewhere from what I've seen.
 Note that this may require renaming related classes, objects, configs and 
 metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-11 Thread Jay Kreps
Cool, I agree with all that.

I agree about the need for a rebalancing callback.

Totally agree about record handler.

It would be great to see if a prototype of this is workable.

Thanks guys!

-Jay

On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Hey Jay,

 Guozhang, Becket and I got together to discuss this and we think:

 - It seems that your proposal based on the new consumer and flush call
   should work.
 - We would likely need to call the poll with a timeout that matches
   the offset commit interval in order to deal with low volume
   mirroring pipelines.
 - We will still need a rebalance callback to reduce duplicates - the
   rebalance callback would need to flush and commit offsets.
 - The only remaining question is if the overall throughput is
   sufficient. I think someone at LinkedIn (I don't remember who) did
   some experiments with data channel size == 1 and ran into issues.
   That was not thoroughly investigated though.
 - The addition of flush may actually make this solution viable for the
   current mirror-maker (with the old consumer). We can prototype that
   offline and if it works out well we can redo KAFKA-1650 (i.e.,
   refactor the current mirror maker). The flush call and the new
   consumer didn't exist at the time we did KAFKA-1650 so this did not
   occur to us.
 - We think the RecordHandler is still a useful small addition for the
   use-cases mentioned earlier in this thread.

 Thanks,

 Joel

 On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote:
  Guozhang, I agree with 1-3, I do think what I was proposing was simpler
 but
  perhaps there are gaps in that?
 
  Hey Joel--Here was a sketch of what I was proposing. I do think this
 get's
  rid of manual offset tracking, especially doing so across threads with
  dedicated commit threads, which I think is pretty complex.
 
  while(true) {
  val recs = consumer.poll(Long.MaxValue);
  for (rec - recs)
  producer.send(rec, logErrorCallback)
  if(System.currentTimeMillis - lastCommit  commitInterval) {
  producer.flush()
  consumer.commit()
  lastCommit = System.currentTimeMillis
  }
  }
 
  (See the previous email for details). I think the question is: is there
 any
  reason--performance, correctness, etc--that this won't work? Basically I
  think you guys have thought about this more so I may be missing
 something.
  If so let's flag it while we still have leeway on the consumer.
 
  If we think that will work, well I do think it is conceptually a lot
  simpler than the current code, though I suppose one could disagree on
 that.
 
  -Jay
 
  On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Hi Jay,
  
The data channels are actually a big part of the complexity of the
 zero
data loss design, though, right? Because then you need some reverse
   channel
to flow the acks back to the consumer based on where you are versus
 just
acking what you have read and written (as in the code snippet I put
 up).
  
   I'm not sure if we are on the same page. Even if the data channel was
   not there the current handling for zero data loss would remain very
   similar - you would need to maintain lists of unacked source offsets.
   I'm wondering if the KIP needs more detail on how it is currently
   implemented; or are suggesting a different approach (in which case I
   have not fully understood). I'm not sure what you mean by flowing acks
   back to the consumer - the MM commits offsets after the producer ack
   has been received. There is some additional complexity introduced in
   reducing duplicates on a rebalance - this is actually optional (since
   duplicates are currently a given). The reason that was done anyway is
   that with the auto-commit turned off duplicates are almost guaranteed
   on a rebalance.
  
I think the point that Neha and I were trying to make was that the
motivation to embed stuff into MM kind of is related to how complex a
simple consume and produce with good throughput will be. If it is
   simple
to write such a thing in a few lines, the pain of embedding a bunch
 of
stuff won't be worth it, if it has to be as complex as the current mm
   then
of course we will need all kinds of plug ins because no one will be
 able
   to
write such a thing. I don't have a huge concern with a simple plug-in
   but I
think if it turns into something more complex with filtering and
aggregation or whatever we really need to stop and think a bit about
 the
design.
  
   I agree - I don't think there is a use-case for any complex plug-in.
   It is pretty much what Becket has described currently for the message
   handler - i.e., take an incoming record and return a list of outgoing
   records (which could be empty if you filter).
  
   So here is my take on the MM:
   - Bare bones: simple consumer - producer pairs (0.7 style). This is
 ideal, but does not handle no data loss

Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-11 Thread Joel Koshy
Hey Jay,

Guozhang, Becket and I got together to discuss this and we think:

- It seems that your proposal based on the new consumer and flush call
  should work.
- We would likely need to call the poll with a timeout that matches
  the offset commit interval in order to deal with low volume
  mirroring pipelines.
- We will still need a rebalance callback to reduce duplicates - the
  rebalance callback would need to flush and commit offsets.
- The only remaining question is if the overall throughput is
  sufficient. I think someone at LinkedIn (I don't remember who) did
  some experiments with data channel size == 1 and ran into issues.
  That was not thoroughly investigated though.
- The addition of flush may actually make this solution viable for the
  current mirror-maker (with the old consumer). We can prototype that
  offline and if it works out well we can redo KAFKA-1650 (i.e.,
  refactor the current mirror maker). The flush call and the new
  consumer didn't exist at the time we did KAFKA-1650 so this did not
  occur to us.
- We think the RecordHandler is still a useful small addition for the
  use-cases mentioned earlier in this thread.

Thanks,

Joel

On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote:
 Guozhang, I agree with 1-3, I do think what I was proposing was simpler but
 perhaps there are gaps in that?
 
 Hey Joel--Here was a sketch of what I was proposing. I do think this get's
 rid of manual offset tracking, especially doing so across threads with
 dedicated commit threads, which I think is pretty complex.
 
 while(true) {
 val recs = consumer.poll(Long.MaxValue);
 for (rec - recs)
 producer.send(rec, logErrorCallback)
 if(System.currentTimeMillis - lastCommit  commitInterval) {
 producer.flush()
 consumer.commit()
 lastCommit = System.currentTimeMillis
 }
 }
 
 (See the previous email for details). I think the question is: is there any
 reason--performance, correctness, etc--that this won't work? Basically I
 think you guys have thought about this more so I may be missing something.
 If so let's flag it while we still have leeway on the consumer.
 
 If we think that will work, well I do think it is conceptually a lot
 simpler than the current code, though I suppose one could disagree on that.
 
 -Jay
 
 On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Hi Jay,
 
   The data channels are actually a big part of the complexity of the zero
   data loss design, though, right? Because then you need some reverse
  channel
   to flow the acks back to the consumer based on where you are versus just
   acking what you have read and written (as in the code snippet I put up).
 
  I'm not sure if we are on the same page. Even if the data channel was
  not there the current handling for zero data loss would remain very
  similar - you would need to maintain lists of unacked source offsets.
  I'm wondering if the KIP needs more detail on how it is currently
  implemented; or are suggesting a different approach (in which case I
  have not fully understood). I'm not sure what you mean by flowing acks
  back to the consumer - the MM commits offsets after the producer ack
  has been received. There is some additional complexity introduced in
  reducing duplicates on a rebalance - this is actually optional (since
  duplicates are currently a given). The reason that was done anyway is
  that with the auto-commit turned off duplicates are almost guaranteed
  on a rebalance.
 
   I think the point that Neha and I were trying to make was that the
   motivation to embed stuff into MM kind of is related to how complex a
   simple consume and produce with good throughput will be. If it is
  simple
   to write such a thing in a few lines, the pain of embedding a bunch of
   stuff won't be worth it, if it has to be as complex as the current mm
  then
   of course we will need all kinds of plug ins because no one will be able
  to
   write such a thing. I don't have a huge concern with a simple plug-in
  but I
   think if it turns into something more complex with filtering and
   aggregation or whatever we really need to stop and think a bit about the
   design.
 
  I agree - I don't think there is a use-case for any complex plug-in.
  It is pretty much what Becket has described currently for the message
  handler - i.e., take an incoming record and return a list of outgoing
  records (which could be empty if you filter).
 
  So here is my take on the MM:
  - Bare bones: simple consumer - producer pairs (0.7 style). This is
ideal, but does not handle no data loss
  - Above plus support no data loss. This actually adds quite a bit of
complexity.
  - Above plus the message handler. This is a trivial addition I think
that makes the MM usable in a few other mirroring-like applications.
 
  Joel
 
   On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
   
   
On Tue, Feb 10, 2015 at 12:13:46PM -0800, 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-11 Thread Jay Kreps
Hey Andrii,

To answer your earlier question we just really can't be adding any more
scala protocol objects. These things are super hard to maintain because
they hand code the byte parsing and don't have good versioning support.
Since we are already planning on converting we definitely don't want to add
a ton more of these--they are total tech debt.

What does it mean that the changes are isolated from the current code base?

I actually didn't understand the remaining comments, which of the points
are you responding to?

Maybe one sticking point here is that it seems like you want to make some
kind of tool, and you have made a 1-1 mapping between commands you imagine
in the tool and protocol additions. I want to make sure we don't do that.
The protocol needs to be really really well thought out against many use
cases so it should make perfect logical sense in the absence of knowing the
command line tool, right?

-Jay

On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Hey Jay,

 I would like to continue this discussion as it seem there is no progress
 here.

 First of all, could you please explain what did you mean in 2? How exactly
 are we going to migrate to the new java protocol definitions. And why it's
 a blocker for centralized CLI?

 I agree with you, this feature includes lots of stuff, but thankfully
 almost all changes are isolated from the current code base,
 so the main thing, I think, we need to agree is RQ/RP format.
 So how can we start discussion about the concrete messages format?
 Can we take (

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat
 )
 as starting point?

 We had some doubts earlier whether it worth introducing one generic Admin
 Request for all commands (https://issues.apache.org/jira/browse/KAFKA-1694
 )
 but then everybody agreed it would be better to have separate message for
 each admin command. The Request part is really dictated from the command
 (e.g. TopicCommand) arguments itself, so the proposed version should be
 fine (let's put aside for now remarks about Optional type, batching,
 configs normalization - I agree with all of them).
 So the second part is Response. I see there are two cases here.
 a) Mutate requests - Create/Alter/... ; b) Get requests -
 List/Describe...

 a) should only hold request result (regardless what we decide about
 blocking/non-blocking commands execution).
 Usually we provide error code in response but since we will use this in
 interactive shell we need some human readable error description - so I
 added errorDesription field where you can at least leave
 exception.getMessage.

 b) in addition to previous item message should hold command specific
 response data. We can discuss in detail each of them but let's for now
 agree about the overall pattern.

 Thanks,
 Andrii Biletskyi

 On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Joe,
 
  This is great. A few comments on KIP-4
 
  1. This is much needed functionality, but there are a lot of the so let's
  really think these protocols through. We really want to end up with a set
  of well thought-out, orthoganol apis. For this reason I think it is
 really
  important to think through the end state even if that includes APIs we
  won't implement in the first phase.
 
  2. Let's please please please wait until we have switched the server over
  to the new java protocol definitions. If we add upteen more ad hoc scala
  objects that is just generating more work for the conversion we know we
  have to do.
 
  3. This proposal introduces a new type of optional parameter. This is
  inconsistent with everything else in the protocol where we use -1 or some
  other marker value. You could argue either way but let's stick with that
  for consistency. For clients that implemented the protocol in a better
 way
  than our scala code these basic primitives are hard to change.
 
  4. ClusterMetadata: This seems to duplicate TopicMetadataRequest which
 has
  brokers, topics, and partitions. I think we should rename that request
  ClusterMetadataRequest (or just MetadataRequest) and include the id of
 the
  controller. Or are there other things we could add here?
 
  5. We have a tendency to try to make a lot of requests that can only go
 to
  particular nodes. This adds a lot of burden for client implementations
 (it
  sounds easy but each discovery can fail in many parts so it ends up
 being a
  full state machine to do right). I think we should consider making admin
  commands and ideally as many of the other apis as possible available on
 all
  brokers and just redirect to the controller on the broker side. Perhaps
  there would be a general way to encapsulate this re-routing behavior.
 
  6. We should probably normalize the key value pairs used for configs
 rather
  than embedding a new formatting. 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-11 Thread Andrii Biletskyi
Hey Jay,

I would like to continue this discussion as it seem there is no progress
here.

First of all, could you please explain what did you mean in 2? How exactly
are we going to migrate to the new java protocol definitions. And why it's
a blocker for centralized CLI?

I agree with you, this feature includes lots of stuff, but thankfully
almost all changes are isolated from the current code base,
so the main thing, I think, we need to agree is RQ/RP format.
So how can we start discussion about the concrete messages format?
Can we take (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat)
as starting point?

We had some doubts earlier whether it worth introducing one generic Admin
Request for all commands (https://issues.apache.org/jira/browse/KAFKA-1694)
but then everybody agreed it would be better to have separate message for
each admin command. The Request part is really dictated from the command
(e.g. TopicCommand) arguments itself, so the proposed version should be
fine (let's put aside for now remarks about Optional type, batching,
configs normalization - I agree with all of them).
So the second part is Response. I see there are two cases here.
a) Mutate requests - Create/Alter/... ; b) Get requests -
List/Describe...

a) should only hold request result (regardless what we decide about
blocking/non-blocking commands execution).
Usually we provide error code in response but since we will use this in
interactive shell we need some human readable error description - so I
added errorDesription field where you can at least leave
exception.getMessage.

b) in addition to previous item message should hold command specific
response data. We can discuss in detail each of them but let's for now
agree about the overall pattern.

Thanks,
Andrii Biletskyi

On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Joe,

 This is great. A few comments on KIP-4

 1. This is much needed functionality, but there are a lot of the so let's
 really think these protocols through. We really want to end up with a set
 of well thought-out, orthoganol apis. For this reason I think it is really
 important to think through the end state even if that includes APIs we
 won't implement in the first phase.

 2. Let's please please please wait until we have switched the server over
 to the new java protocol definitions. If we add upteen more ad hoc scala
 objects that is just generating more work for the conversion we know we
 have to do.

 3. This proposal introduces a new type of optional parameter. This is
 inconsistent with everything else in the protocol where we use -1 or some
 other marker value. You could argue either way but let's stick with that
 for consistency. For clients that implemented the protocol in a better way
 than our scala code these basic primitives are hard to change.

 4. ClusterMetadata: This seems to duplicate TopicMetadataRequest which has
 brokers, topics, and partitions. I think we should rename that request
 ClusterMetadataRequest (or just MetadataRequest) and include the id of the
 controller. Or are there other things we could add here?

 5. We have a tendency to try to make a lot of requests that can only go to
 particular nodes. This adds a lot of burden for client implementations (it
 sounds easy but each discovery can fail in many parts so it ends up being a
 full state machine to do right). I think we should consider making admin
 commands and ideally as many of the other apis as possible available on all
 brokers and just redirect to the controller on the broker side. Perhaps
 there would be a general way to encapsulate this re-routing behavior.

 6. We should probably normalize the key value pairs used for configs rather
 than embedding a new formatting. So two strings rather than one with an
 internal equals sign.

 7. Is the postcondition of these APIs that the command has begun or that
 the command has been completed? It is a lot more usable if the command has
 been completed so you know that if you create a topic and then publish to
 it you won't get an exception about there being no such topic.

 8. Describe topic and list topics duplicate a lot of stuff in the metadata
 request. Is there a reason to give back topics marked for deletion? I feel
 like if we just make the post-condition of the delete command be that the
 topic is deleted that will get rid of the need for this right? And it will
 be much more intuitive.

 9. Should we consider batching these requests? We have generally tried to
 allow multiple operations to be batched. My suspicion is that without this
 we will get a lot of code that does something like
for(topic: adminClient.listTopics())
   adminClient.describeTopic(topic)
 this code will work great when you test on 5 topics but not do as well if
 you have 50k.

 10. I think we should also discuss how we want 

Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/#review72032
---


Can you add unit tests?

- Gwen Shapira


On Feb. 11, 2015, 10:34 p.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29468/
 ---
 
 (Updated Feb. 11, 2015, 10:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
 https://issues.apache.org/jira/browse/KAFKA-1805
 https://issues.apache.org/jira/browse/KAFKA-1905
 https://issues.apache.org/jira/browse/KAFKA-42
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Merge remote-tracking branch 'origin/trunk' into KAFKA-1805
 
 
 Handling the case where al the fields in ProducerRecord can be null.
 
 
 Adding toString back.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
 065d4e6c6a4966ac216e98696782e2714044df29 
 
 Diff: https://reviews.apache.org/r/29468/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Parth Brahmbhatt
 




Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:34 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description (updated)
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Adding toString back.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 

Diff: https://reviews.apache.org/r/29468/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317118#comment-14317118
 ] 

Parth Brahmbhatt commented on KAFKA-1805:
-

Updated reviewboard https://reviews.apache.org/r/29468/diff/
 against branch origin/trunk

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1805:

Attachment: KAFKA-1805_2015-02-11_14:34:28.patch

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317108#comment-14317108
 ] 

Parth Brahmbhatt commented on KAFKA-1805:
-

Updated reviewboard https://reviews.apache.org/r/29468/diff/
 against branch origin/trunk

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:30 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description (updated)
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 

Diff: https://reviews.apache.org/r/29468/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1805:

Attachment: KAFKA-1805_2015-02-11_14:30:14.patch

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:37 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Adding toString back.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 

Diff: https://reviews.apache.org/r/29468/diff/


Testing
---


Thanks,

Parth Brahmbhatt



Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/#review72031
---



clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
https://reviews.apache.org/r/29468/#comment117965

not intentional, removed.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
https://reviews.apache.org/r/29468/#comment117966

handling null for all fields now.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
https://reviews.apache.org/r/29468/#comment117967

nulls are handled now.


- Parth Brahmbhatt


On Feb. 11, 2015, 10:53 p.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29468/
 ---
 
 (Updated Feb. 11, 2015, 10:53 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
 https://issues.apache.org/jira/browse/KAFKA-1805
 https://issues.apache.org/jira/browse/KAFKA-1905
 https://issues.apache.org/jira/browse/KAFKA-42
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Merge remote-tracking branch 'origin/trunk' into KAFKA-1805
 
 
 Handling the case where al the fields in ProducerRecord can be null.
 
 
 Adding toString back.
 
 
 Added unit test for eqauls and hashcode.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
 065d4e6c6a4966ac216e98696782e2714044df29 
   
 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/29468/diff/
 
 
 Testing
 ---
 
 Unit tests added.
 
 
 Thanks,
 
 Parth Brahmbhatt
 




Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:53 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Adding toString back.


Added unit test for eqauls and hashcode.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 
  
clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/29468/diff/


Testing (updated)
---

Unit tests added.


Thanks,

Parth Brahmbhatt



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-11 Thread Andrii Biletskyi
Jay,

Thanks for answering. You understood correctly, most of my comments were
related to your point 1) - about well thought-out apis. Also, yes, as I
understood we would like to introduce a single unified CLI tool with
centralized server-side request handling for lots of existing ones (incl.
TopicCommand, CommitOffsetChecker, ReassignPartitions, smth else if added
in future). In our previous discussion (
https://issues.apache.org/jira/browse/KAFKA-1694) people said they'd rather
have a separate message for each command, so, yes, this way I came to 1-1
mapping between commands in the tool and protocol additions. But I might be
wrong.
At the end I just try to start discussion how at least generally this
protocol should look like.

Thanks,
Andrii

On Wed, Feb 11, 2015 at 11:10 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Andrii,

 To answer your earlier question we just really can't be adding any more
 scala protocol objects. These things are super hard to maintain because
 they hand code the byte parsing and don't have good versioning support.
 Since we are already planning on converting we definitely don't want to add
 a ton more of these--they are total tech debt.

 What does it mean that the changes are isolated from the current code base?

 I actually didn't understand the remaining comments, which of the points
 are you responding to?

 Maybe one sticking point here is that it seems like you want to make some
 kind of tool, and you have made a 1-1 mapping between commands you imagine
 in the tool and protocol additions. I want to make sure we don't do that.
 The protocol needs to be really really well thought out against many use
 cases so it should make perfect logical sense in the absence of knowing the
 command line tool, right?

 -Jay

 On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Hey Jay,
 
  I would like to continue this discussion as it seem there is no progress
  here.
 
  First of all, could you please explain what did you mean in 2? How
 exactly
  are we going to migrate to the new java protocol definitions. And why
 it's
  a blocker for centralized CLI?
 
  I agree with you, this feature includes lots of stuff, but thankfully
  almost all changes are isolated from the current code base,
  so the main thing, I think, we need to agree is RQ/RP format.
  So how can we start discussion about the concrete messages format?
  Can we take (
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat
  )
  as starting point?
 
  We had some doubts earlier whether it worth introducing one generic Admin
  Request for all commands (
 https://issues.apache.org/jira/browse/KAFKA-1694
  )
  but then everybody agreed it would be better to have separate message for
  each admin command. The Request part is really dictated from the command
  (e.g. TopicCommand) arguments itself, so the proposed version should be
  fine (let's put aside for now remarks about Optional type, batching,
  configs normalization - I agree with all of them).
  So the second part is Response. I see there are two cases here.
  a) Mutate requests - Create/Alter/... ; b) Get requests -
  List/Describe...
 
  a) should only hold request result (regardless what we decide about
  blocking/non-blocking commands execution).
  Usually we provide error code in response but since we will use this in
  interactive shell we need some human readable error description - so I
  added errorDesription field where you can at least leave
  exception.getMessage.
 
  b) in addition to previous item message should hold command specific
  response data. We can discuss in detail each of them but let's for now
  agree about the overall pattern.
 
  Thanks,
  Andrii Biletskyi
 
  On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey Joe,
  
   This is great. A few comments on KIP-4
  
   1. This is much needed functionality, but there are a lot of the so
 let's
   really think these protocols through. We really want to end up with a
 set
   of well thought-out, orthoganol apis. For this reason I think it is
  really
   important to think through the end state even if that includes APIs we
   won't implement in the first phase.
  
   2. Let's please please please wait until we have switched the server
 over
   to the new java protocol definitions. If we add upteen more ad hoc
 scala
   objects that is just generating more work for the conversion we know we
   have to do.
  
   3. This proposal introduces a new type of optional parameter. This is
   inconsistent with everything else in the protocol where we use -1 or
 some
   other marker value. You could argue either way but let's stick with
 that
   for consistency. For clients that implemented the protocol in a better
  way
   than our scala code these basic primitives are hard to change.
  
   4. 

Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:49 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description (updated)
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Adding toString back.


Added unit test for eqauls and hashcode.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 
  
clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/29468/diff/


Testing
---


Thanks,

Parth Brahmbhatt



Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception

2015-02-11 Thread Guozhang Wang
If people have agreed upon this semantic:

quote
if you set retries  0 you are saying I accept
duplicates but want to ensure my stuff gets written, if you set retries =
0 you are saying I can't abide duplicates and am willing to tolerate
loss. So Retryable for us means retry may succeed.
\quote

then NotEnoughReplicasAfterAppend should be retriable.

PS: we can probably make it clearer in the new producer config table?

Guozhang

On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Thanks for the comments - however, it is not clear to me what your
 preference is on making NotEnoughReplicasAfterAppend retriable vs
 non-retriable.

 As for me, my preference is to leave it as retriable since it is clear
 that the produce may succeed on a retry (and may introduce a
 duplicate). I agree that idempotence will bring full closure to this
 though.

 Anyone else have a preference on this?

 Thanks,

 Joel

 On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote:
  Yeah there are really two concepts here as I think you noted:
  1. Retry safe: we know that the write did not occur
  2. Retry fixable: if you send that again it could work
 
  (probably there are better names for these).
 
  Some things we know did not do a write and may be fixed by retrying (no
  leader). Some things we know didn't do a write and are not worth retrying
  (message too large). Somethings we don't know and are worth retrying
  (network error), and probably some things we don't know and aren't worth
 it
  (can't think of one though).
 
  (I feel like Donald Rumsfeld with the known unknowns thing).
 
  In the current world if you set retries  0 you are saying I accept
  duplicates but want to ensure my stuff gets written, if you set retries
 =
  0 you are saying I can't abide duplicates and am willing to tolerate
  loss. So Retryable for us means retry may succeed.
 
  Originally I thought of maybe trying to model both concepts. However the
  two arguments against it are:
  1. Even if you do this the guarantee remains at least once delivery
  because: (1) in the network error case you just don't know, (2) consumer
  failure.
  2. The proper fix for this is to add idempotence support on the server,
  which we should do.
 
  Doing idempotence support on the server will actually fix all duplicate
  problems, including the network error case (because of course the server
  knows whether your write went through even though the client doesn't).
 When
  we have that then the client can always just retry anything marked
  Retriable (i.e. retry may work) without fear of duplicates.
 
  This gives exactly once delivery to the log, and a co-operating consumer
  can use the offset to dedupe and get it end-to-end.
 
  So that was why I had just left one type of Retriable and used it to mean
  retry may work and don't try to flag anything for duplicates.
 
  -Jay
 
 
 
 
  On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
   Hi Kafka Devs,
  
   Need your thoughts on retriable exceptions:
  
   If a user configures Kafka with min.isr  1 and there are not enough
   replicas to safely store the data, there are two possibilities:
  
   1. The lack of replicas was discovered before the message was written.
 We
   throw NotEnoughReplicas.
   2. The lack of replicas was discovered after the message was written to
   leader. In this case, we throw  NotEnoughReplicasAfterAppend.
  
   Currently, both errors are Retriable. Which means that the new producer
   will retry multiple times.
   In case of the second exception, this will cause duplicates.
  
   KAFKA-1697 suggests:
   we probably want to make NotEnoughReplicasAfterAppend a non-retriable
   exception and let the client decide what to do.
  
   I agreed that the client (the one using the Producer) should weight the
   problems duplicates will cause vs. the probability of losing the
 message
   and do something sensible and made the exception non-retriable.
  
   In the RB (https://reviews.apache.org/r/29647/) Joel raised a good
 point:
   (Joel, feel free to correct me if I misrepresented your point)
  
   I think our interpretation of retriable is as follows (but we can
 discuss
   on the list if that needs to change): if the produce request hits an
 error,
   and there is absolutely no point in retrying then that is a
 non-retriable
   error. MessageSizeTooLarge is an example - since unless the producer
   changes the request to make the messages smaller there is no point in
   retrying.
  
   ...
   Duplicates can arise even for other errors (e.g., request timed out).
 So
   that side-effect is not compelling enough to warrant a change to make
 this
   non-retriable. 
  
   *(TL;DR;  )  Should exceptions where retries can cause duplicates
 should
   still be *
   *retriable?*
  
   Gwen
  




-- 
-- Guozhang


Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception

2015-02-11 Thread Gwen Shapira
Makes sense to me.

Thanks for the very detailed clarification, Jay :)

Will leave NotEnoughReplicasAfterAppend as retriable.

Gwen

On Wed, Feb 11, 2015 at 4:18 PM, Guozhang Wang wangg...@gmail.com wrote:

 If people have agreed upon this semantic:

 quote
 if you set retries  0 you are saying I accept
 duplicates but want to ensure my stuff gets written, if you set retries =
 0 you are saying I can't abide duplicates and am willing to tolerate
 loss. So Retryable for us means retry may succeed.
 \quote

 then NotEnoughReplicasAfterAppend should be retriable.

 PS: we can probably make it clearer in the new producer config table?

 Guozhang

 On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote:

  Thanks for the comments - however, it is not clear to me what your
  preference is on making NotEnoughReplicasAfterAppend retriable vs
  non-retriable.
 
  As for me, my preference is to leave it as retriable since it is clear
  that the produce may succeed on a retry (and may introduce a
  duplicate). I agree that idempotence will bring full closure to this
  though.
 
  Anyone else have a preference on this?
 
  Thanks,
 
  Joel
 
  On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote:
   Yeah there are really two concepts here as I think you noted:
   1. Retry safe: we know that the write did not occur
   2. Retry fixable: if you send that again it could work
  
   (probably there are better names for these).
  
   Some things we know did not do a write and may be fixed by retrying (no
   leader). Some things we know didn't do a write and are not worth
 retrying
   (message too large). Somethings we don't know and are worth retrying
   (network error), and probably some things we don't know and aren't
 worth
  it
   (can't think of one though).
  
   (I feel like Donald Rumsfeld with the known unknowns thing).
  
   In the current world if you set retries  0 you are saying I accept
   duplicates but want to ensure my stuff gets written, if you set
 retries
  =
   0 you are saying I can't abide duplicates and am willing to tolerate
   loss. So Retryable for us means retry may succeed.
  
   Originally I thought of maybe trying to model both concepts. However
 the
   two arguments against it are:
   1. Even if you do this the guarantee remains at least once delivery
   because: (1) in the network error case you just don't know, (2)
 consumer
   failure.
   2. The proper fix for this is to add idempotence support on the server,
   which we should do.
  
   Doing idempotence support on the server will actually fix all duplicate
   problems, including the network error case (because of course the
 server
   knows whether your write went through even though the client doesn't).
  When
   we have that then the client can always just retry anything marked
   Retriable (i.e. retry may work) without fear of duplicates.
  
   This gives exactly once delivery to the log, and a co-operating
 consumer
   can use the offset to dedupe and get it end-to-end.
  
   So that was why I had just left one type of Retriable and used it to
 mean
   retry may work and don't try to flag anything for duplicates.
  
   -Jay
  
  
  
  
   On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
Hi Kafka Devs,
   
Need your thoughts on retriable exceptions:
   
If a user configures Kafka with min.isr  1 and there are not enough
replicas to safely store the data, there are two possibilities:
   
1. The lack of replicas was discovered before the message was
 written.
  We
throw NotEnoughReplicas.
2. The lack of replicas was discovered after the message was written
 to
leader. In this case, we throw  NotEnoughReplicasAfterAppend.
   
Currently, both errors are Retriable. Which means that the new
 producer
will retry multiple times.
In case of the second exception, this will cause duplicates.
   
KAFKA-1697 suggests:
we probably want to make NotEnoughReplicasAfterAppend a
 non-retriable
exception and let the client decide what to do.
   
I agreed that the client (the one using the Producer) should weight
 the
problems duplicates will cause vs. the probability of losing the
  message
and do something sensible and made the exception non-retriable.
   
In the RB (https://reviews.apache.org/r/29647/) Joel raised a good
  point:
(Joel, feel free to correct me if I misrepresented your point)
   
I think our interpretation of retriable is as follows (but we can
  discuss
on the list if that needs to change): if the produce request hits an
  error,
and there is absolutely no point in retrying then that is a
  non-retriable
error. MessageSizeTooLarge is an example - since unless the producer
changes the request to make the messages smaller there is no point in
retrying.
   
...
Duplicates can arise even for other errors (e.g., request timed out).
  So
that side-effect is not compelling enough to 

Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Joel Koshy
The description that Jun gave for (2) was the detail I was looking for
- Gwen can you update the KIP with that for completeness/clarity?

I'm +1 as well overall. However, I think it would be good if we also
get an ack from someone who is more experienced on the operations side
(say, Todd) to review especially the upgrade plan.

On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote:
 +1 for proposed changes in 1 and 2.
 
 1. The impact is that if someone uses SimpleConsumer and references Broker
 explicitly, the application needs code change to compile with 0.8.3. Since
 SimpleConsumer is not widely used, breaking the API in SimpleConsumer but
 maintaining overall code cleanness seems to be a better tradeoff.
 
 2. For clarification, the issue is the following. In 0.8.3, we will be
 evolving the wire protocol of UpdateMedataRequest (to send info about
 endpoints for different security protocols). Since this is used in
 intra-cluster communication, we need to do the upgrade in two steps. The
 idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When
 upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step
 1, all brokers will be capable for processing the new protocol in 0.8.3,
 but without actually using it. In step 2, we
 configure wire.protocol.version to 0.8.3 in each broker and do another
 rolling restart. After step 2, all brokers will start using the new
 protocol in 0.8.3. Let's say that in the next release 0.9, we are changing
 the intra-cluster wire protocol again. We will do the similar thing:
 defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade
 from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2
 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2
 first and then do the two-step upgrade to 0.9.
 
 Gwen,
 
 In KIP2, there is still a reference to use.new.protocol. This needs to be
 removed. Also, would it be better to use intra.cluster.wire.protocol.version
 since this only applies to the wire protocol among brokers?
 
 Others,
 
 The patch in KAFKA-1809 is almost ready. It would be good to wrap up the
 discussion on KIP2 soon. So, if you haven't looked at this KIP, please take
 a look and send your comments.
 
 Thanks,
 
 Jun
 
 
 On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  Hi Kafka Devs,
 
  While reviewing the patch for KAFKA-1809, we came across two questions
  that we are interested in hearing the community out on.
 
  1. This patch changes the Broker class and adds a new class
  BrokerEndPoint that behaves like the previous broker.
 
  While technically kafka.cluster.Broker is not part of the public API,
  it is returned by javaapi, used with the SimpleConsumer.
 
  Getting replicas from PartitionMetadata will now return BrokerEndPoint
  instead of Broker. All method calls remain the same, but since we
  return a new type, we break the API.
 
  Note that this breakage does not prevent upgrades - existing
  SimpleConsumers will continue working (because we are
  wire-compatible).
  The only thing that won't work is building SimpleConsumers with
  dependency on Kafka versions higher than 0.8.2. Arguably, we don't
  want anyone to do it anyway :)
 
  So:
  Do we state that the highest release on which SimpleConsumers can
  depend is 0.8.2? Or shall we keep Broker as is and create an
  UberBroker which will contain multiple brokers as its endpoints?
 
  2.
  The KIP suggests use.new.wire.protocol configuration to decide which
  protocols the brokers will use to talk to each other. The problem is
  that after the next upgrade, the wire protocol is no longer new, so
  we'll have to reset it to false for the following upgrade, then change
  to true again... and upgrading more than a single version will be
  impossible.
  Bad idea :)
 
  As an alternative, we can have a property for each version and set one
  of them to true. Or (simple, I think) have wire.protocol.version
  property and accept version numbers (0.8.2, 0.8.3, 0.9) as values.
 
  Please share your thoughts :)
 
  Gwen
 



Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Gwen Shapira
Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was
clearly struggling with this...) and removed the reference to
use.new.wire.protocol.

On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote:

 The description that Jun gave for (2) was the detail I was looking for
 - Gwen can you update the KIP with that for completeness/clarity?

 I'm +1 as well overall. However, I think it would be good if we also
 get an ack from someone who is more experienced on the operations side
 (say, Todd) to review especially the upgrade plan.

 On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote:
  +1 for proposed changes in 1 and 2.
 
  1. The impact is that if someone uses SimpleConsumer and references
 Broker
  explicitly, the application needs code change to compile with 0.8.3.
 Since
  SimpleConsumer is not widely used, breaking the API in SimpleConsumer but
  maintaining overall code cleanness seems to be a better tradeoff.
 
  2. For clarification, the issue is the following. In 0.8.3, we will be
  evolving the wire protocol of UpdateMedataRequest (to send info about
  endpoints for different security protocols). Since this is used in
  intra-cluster communication, we need to do the upgrade in two steps. The
  idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2.
 When
  upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After
 step
  1, all brokers will be capable for processing the new protocol in 0.8.3,
  but without actually using it. In step 2, we
  configure wire.protocol.version to 0.8.3 in each broker and do another
  rolling restart. After step 2, all brokers will start using the new
  protocol in 0.8.3. Let's say that in the next release 0.9, we are
 changing
  the intra-cluster wire protocol again. We will do the similar thing:
  defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can
 upgrade
  from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2
  to 0.9 directly, they will have to configure wire.protocol.version to
 0.8.2
  first and then do the two-step upgrade to 0.9.
 
  Gwen,
 
  In KIP2, there is still a reference to use.new.protocol. This needs to be
  removed. Also, would it be better to use
 intra.cluster.wire.protocol.version
  since this only applies to the wire protocol among brokers?
 
  Others,
 
  The patch in KAFKA-1809 is almost ready. It would be good to wrap up the
  discussion on KIP2 soon. So, if you haven't looked at this KIP, please
 take
  a look and send your comments.
 
  Thanks,
 
  Jun
 
 
  On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
   Hi Kafka Devs,
  
   While reviewing the patch for KAFKA-1809, we came across two questions
   that we are interested in hearing the community out on.
  
   1. This patch changes the Broker class and adds a new class
   BrokerEndPoint that behaves like the previous broker.
  
   While technically kafka.cluster.Broker is not part of the public API,
   it is returned by javaapi, used with the SimpleConsumer.
  
   Getting replicas from PartitionMetadata will now return BrokerEndPoint
   instead of Broker. All method calls remain the same, but since we
   return a new type, we break the API.
  
   Note that this breakage does not prevent upgrades - existing
   SimpleConsumers will continue working (because we are
   wire-compatible).
   The only thing that won't work is building SimpleConsumers with
   dependency on Kafka versions higher than 0.8.2. Arguably, we don't
   want anyone to do it anyway :)
  
   So:
   Do we state that the highest release on which SimpleConsumers can
   depend is 0.8.2? Or shall we keep Broker as is and create an
   UberBroker which will contain multiple brokers as its endpoints?
  
   2.
   The KIP suggests use.new.wire.protocol configuration to decide which
   protocols the brokers will use to talk to each other. The problem is
   that after the next upgrade, the wire protocol is no longer new, so
   we'll have to reset it to false for the following upgrade, then change
   to true again... and upgrading more than a single version will be
   impossible.
   Bad idea :)
  
   As an alternative, we can have a property for each version and set one
   of them to true. Or (simple, I think) have wire.protocol.version
   property and accept version numbers (0.8.2, 0.8.3, 0.9) as values.
  
   Please share your thoughts :)
  
   Gwen
  




Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-02-11 Thread Jay Kreps
Yeah I totally agree that we don't want to just have one do admin stuff
command that has the union of all parameters.

What I am saying is that command line tools are one client of the
administrative apis, but these will be used in a number of scenarios so
they should make logical sense even in the absence of the command line
tool. Hence comments like trying to clarify the relationship between
ClusterMetadata and TopicMetadata...these kinds of things really need to be
thought through.

Hope that makes sense.

-Jay

On Wed, Feb 11, 2015 at 1:41 PM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Jay,

 Thanks for answering. You understood correctly, most of my comments were
 related to your point 1) - about well thought-out apis. Also, yes, as I
 understood we would like to introduce a single unified CLI tool with
 centralized server-side request handling for lots of existing ones (incl.
 TopicCommand, CommitOffsetChecker, ReassignPartitions, smth else if added
 in future). In our previous discussion (
 https://issues.apache.org/jira/browse/KAFKA-1694) people said they'd
 rather
 have a separate message for each command, so, yes, this way I came to 1-1
 mapping between commands in the tool and protocol additions. But I might be
 wrong.
 At the end I just try to start discussion how at least generally this
 protocol should look like.

 Thanks,
 Andrii

 On Wed, Feb 11, 2015 at 11:10 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Andrii,
 
  To answer your earlier question we just really can't be adding any more
  scala protocol objects. These things are super hard to maintain because
  they hand code the byte parsing and don't have good versioning support.
  Since we are already planning on converting we definitely don't want to
 add
  a ton more of these--they are total tech debt.
 
  What does it mean that the changes are isolated from the current code
 base?
 
  I actually didn't understand the remaining comments, which of the points
  are you responding to?
 
  Maybe one sticking point here is that it seems like you want to make some
  kind of tool, and you have made a 1-1 mapping between commands you
 imagine
  in the tool and protocol additions. I want to make sure we don't do that.
  The protocol needs to be really really well thought out against many use
  cases so it should make perfect logical sense in the absence of knowing
 the
  command line tool, right?
 
  -Jay
 
  On Wed, Feb 11, 2015 at 11:57 AM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Hey Jay,
  
   I would like to continue this discussion as it seem there is no
 progress
   here.
  
   First of all, could you please explain what did you mean in 2? How
  exactly
   are we going to migrate to the new java protocol definitions. And why
  it's
   a blocker for centralized CLI?
  
   I agree with you, this feature includes lots of stuff, but thankfully
   almost all changes are isolated from the current code base,
   so the main thing, I think, we need to agree is RQ/RP format.
   So how can we start discussion about the concrete messages format?
   Can we take (
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-ProposedRQ/RPFormat
   )
   as starting point?
  
   We had some doubts earlier whether it worth introducing one generic
 Admin
   Request for all commands (
  https://issues.apache.org/jira/browse/KAFKA-1694
   )
   but then everybody agreed it would be better to have separate message
 for
   each admin command. The Request part is really dictated from the
 command
   (e.g. TopicCommand) arguments itself, so the proposed version should be
   fine (let's put aside for now remarks about Optional type, batching,
   configs normalization - I agree with all of them).
   So the second part is Response. I see there are two cases here.
   a) Mutate requests - Create/Alter/... ; b) Get requests -
   List/Describe...
  
   a) should only hold request result (regardless what we decide about
   blocking/non-blocking commands execution).
   Usually we provide error code in response but since we will use this in
   interactive shell we need some human readable error description - so I
   added errorDesription field where you can at least leave
   exception.getMessage.
  
   b) in addition to previous item message should hold command specific
   response data. We can discuss in detail each of them but let's for now
   agree about the overall pattern.
  
   Thanks,
   Andrii Biletskyi
  
   On Fri, Jan 23, 2015 at 6:59 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
Hey Joe,
   
This is great. A few comments on KIP-4
   
1. This is much needed functionality, but there are a lot of the so
  let's
really think these protocols through. We really want to end up with a
  set
of well thought-out, orthoganol apis. For this reason I think it is
   really
important to think through the 

Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Gwen Shapira
Jun,

I'm not sure we should default wire.protocol.version to the previous
version. This will make fresh installs a bit weird :)
I think we should default to the new version and assume that when I'm
upgrading a broker, I'm re-using an existing configuration file.

This way, if I'm upgrading 0.8.3.0 to 0.9.0.0, the configuration file
already says wire.protocol.version=0.8.3.0 and I need to bump it post
upgrade.
Fresh install will include 0.9.0.0, so I won't need to bump anything.

The only exception is with 0.8.2.0, where I'll need to add
wire.protocol.version=0.8.2.0 before upgrading to 0.8.3.0.

Does that make sense?

Regarding the naming, I agree that this parameter only controls the
protocol between brokers (clients control the version of the protocol when
they are involved, on a per-message basis). However,
inter.broker.wire.protocol.version makes it sound like there may be other
types of wire.protocol.version in the future, and I'm pretty sure we want a
single parameter for controlling protocol versions from broker side.
Not a big deal for me either way.

On Wed, Feb 11, 2015 at 9:40 AM, Jun Rao j...@confluent.io wrote:

 +1 for proposed changes in 1 and 2.

 1. The impact is that if someone uses SimpleConsumer and references Broker
 explicitly, the application needs code change to compile with 0.8.3. Since
 SimpleConsumer is not widely used, breaking the API in SimpleConsumer but
 maintaining overall code cleanness seems to be a better tradeoff.

 2. For clarification, the issue is the following. In 0.8.3, we will be
 evolving the wire protocol of UpdateMedataRequest (to send info about
 endpoints for different security protocols). Since this is used in
 intra-cluster communication, we need to do the upgrade in two steps. The
 idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2. When
 upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After step
 1, all brokers will be capable for processing the new protocol in 0.8.3,
 but without actually using it. In step 2, we
 configure wire.protocol.version to 0.8.3 in each broker and do another
 rolling restart. After step 2, all brokers will start using the new
 protocol in 0.8.3. Let's say that in the next release 0.9, we are changing
 the intra-cluster wire protocol again. We will do the similar thing:
 defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can upgrade
 from 0.8.3 to 0.9 in two steps. For people who want to upgrade from 0.8.2
 to 0.9 directly, they will have to configure wire.protocol.version to 0.8.2
 first and then do the two-step upgrade to 0.9.

 Gwen,

 In KIP2, there is still a reference to use.new.protocol. This needs to be
 removed. Also, would it be better to use
 intra.cluster.wire.protocol.version
 since this only applies to the wire protocol among brokers?

 Others,

 The patch in KAFKA-1809 is almost ready. It would be good to wrap up the
 discussion on KIP2 soon. So, if you haven't looked at this KIP, please take
 a look and send your comments.

 Thanks,

 Jun


 On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi Kafka Devs,
 
  While reviewing the patch for KAFKA-1809, we came across two questions
  that we are interested in hearing the community out on.
 
  1. This patch changes the Broker class and adds a new class
  BrokerEndPoint that behaves like the previous broker.
 
  While technically kafka.cluster.Broker is not part of the public API,
  it is returned by javaapi, used with the SimpleConsumer.
 
  Getting replicas from PartitionMetadata will now return BrokerEndPoint
  instead of Broker. All method calls remain the same, but since we
  return a new type, we break the API.
 
  Note that this breakage does not prevent upgrades - existing
  SimpleConsumers will continue working (because we are
  wire-compatible).
  The only thing that won't work is building SimpleConsumers with
  dependency on Kafka versions higher than 0.8.2. Arguably, we don't
  want anyone to do it anyway :)
 
  So:
  Do we state that the highest release on which SimpleConsumers can
  depend is 0.8.2? Or shall we keep Broker as is and create an
  UberBroker which will contain multiple brokers as its endpoints?
 
  2.
  The KIP suggests use.new.wire.protocol configuration to decide which
  protocols the brokers will use to talk to each other. The problem is
  that after the next upgrade, the wire protocol is no longer new, so
  we'll have to reset it to false for the following upgrade, then change
  to true again... and upgrading more than a single version will be
  impossible.
  Bad idea :)
 
  As an alternative, we can have a property for each version and set one
  of them to true. Or (simple, I think) have wire.protocol.version
  property and accept version numbers (0.8.2, 0.8.3, 0.9) as values.
 
  Please share your thoughts :)
 
  Gwen
 



[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1805:

Attachment: KAFKA-1805_2015-02-11_14:37:41.patch

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317124#comment-14317124
 ] 

Parth Brahmbhatt commented on KAFKA-1805:
-

Updated reviewboard https://reviews.apache.org/r/29468/diff/
 against branch origin/trunk

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317123#comment-14317123
 ] 

Parth Brahmbhatt commented on KAFKA-1805:
-

Updated reviewboard https://reviews.apache.org/r/29468/diff/
 against branch origin/trunk

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/
---

(Updated Feb. 11, 2015, 10:37 p.m.)


Review request for kafka.


Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
https://issues.apache.org/jira/browse/KAFKA-1805
https://issues.apache.org/jira/browse/KAFKA-1905
https://issues.apache.org/jira/browse/KAFKA-42


Repository: kafka


Description
---

Merge remote-tracking branch 'origin/trunk' into KAFKA-1805


Handling the case where al the fields in ProducerRecord can be null.


Adding toString back.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
065d4e6c6a4966ac216e98696782e2714044df29 

Diff: https://reviews.apache.org/r/29468/diff/


Testing
---


Thanks,

Parth Brahmbhatt



[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1805:

Attachment: KAFKA-1805_2015-02-11_14:37:09.patch

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Parth Brahmbhatt updated KAFKA-1805:

Attachment: KAFKA-1805_2015-02-11_14:49:10.patch

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch, 
 KAFKA-1805_2015-02-11_14:49:10.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29468: Patch for KAFKA-1805

2015-02-11 Thread Parth Brahmbhatt


 On Feb. 11, 2015, 10:35 p.m., Gwen Shapira wrote:
  Can you add unit tests?

Added uni tests.


- Parth


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29468/#review72032
---


On Feb. 11, 2015, 10:49 p.m., Parth Brahmbhatt wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29468/
 ---
 
 (Updated Feb. 11, 2015, 10:49 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1805, KAFKA-1905 and KAFKA-42
 https://issues.apache.org/jira/browse/KAFKA-1805
 https://issues.apache.org/jira/browse/KAFKA-1905
 https://issues.apache.org/jira/browse/KAFKA-42
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Merge remote-tracking branch 'origin/trunk' into KAFKA-1805
 
 
 Handling the case where al the fields in ProducerRecord can be null.
 
 
 Adding toString back.
 
 
 Added unit test for eqauls and hashcode.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
 065d4e6c6a4966ac216e98696782e2714044df29 
   
 clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/29468/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Parth Brahmbhatt
 




[jira] [Commented] (KAFKA-1805) Kafka ProducerRecord should implement equals

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317145#comment-14317145
 ] 

Parth Brahmbhatt commented on KAFKA-1805:
-

Updated reviewboard https://reviews.apache.org/r/29468/diff/
 against branch origin/trunk

 Kafka ProducerRecord should implement equals
 

 Key: KAFKA-1805
 URL: https://issues.apache.org/jira/browse/KAFKA-1805
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Thomas Omans
Assignee: Thomas Omans
Priority: Minor
 Attachments: KAFKA-1805.patch, KAFKA-1805_2014-12-29_16:37:11.patch, 
 KAFKA-1805_2015-02-11_14:30:14.patch, KAFKA-1805_2015-02-11_14:34:28.patch, 
 KAFKA-1805_2015-02-11_14:37:09.patch, KAFKA-1805_2015-02-11_14:37:41.patch, 
 KAFKA-1805_2015-02-11_14:49:10.patch


 I was writing some tests to verify that I am calculating my partitions, 
 topics, keys, and values properly in my producer code and discovered that 
 ProducerRecord does not implement equality.
 This makes tests integrating kafka particularly awkward.
 https://github.com/apache/kafka/blob/0.8.2-beta/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
 I can whip up a patch since this is essentially just a value object.
 Thanks,
 Thomas Omans



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-02-11 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317171#comment-14317171
 ] 

Parth Brahmbhatt commented on KAFKA-1660:
-

I am not aware of any clean way to force shutdown a running java thread, how do 
you propose to do that in the second approach?

 Ability to call close() with a timeout on the Java Kafka Producer. 
 ---

 Key: KAFKA-1660
 URL: https://issues.apache.org/jira/browse/KAFKA-1660
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2.0
Reporter: Andrew Stein
Assignee: Jun Rao
 Fix For: 0.8.3

 Attachments: KAFKA-1660.patch


 I would like the ability to call {{close}} with a timeout on the Java 
 Client's KafkaProducer.
 h6. Workaround
 Currently, it is possible to ensure that {{close}} will return quickly by 
 first doing a {{future.get(timeout)}} on the last future produced on each 
 partition, but this means that the user has to define the partitions up front 
 at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-02-11 Thread Gwen Shapira
Looks good. Thanks for working on this.

One note, the Channel implementation from SSL only works on Java7 and up.
Since we are still supporting Java 6, I'm working on a lighter wrapper that
will be a composite on SocketChannel but will not extend it. Perhaps you'll
want to use that.

Looking forward to the patch!

Gwen

On Wed, Feb 11, 2015 at 9:17 AM, Harsha m...@harsha.io wrote:

 Hi,
 Here is the initial proposal for sasl/kerberos implementation for
 kafka https://cwiki.apache.org/confluence/x/YI4WAw
 and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
 currently working on prototype which will add more details to the KIP.
 Just opening the thread to say the work is in progress. I'll update the
 thread with a initial prototype patch.
 Thanks,
 Harsha



Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira


 On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java,
   line 23
  https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23
 
  Pending discussion.
 
 Gwen Shapira wrote:
 changing back to retriable, per discussion in mailing list. 
 I'm leaving this as a separate exception and error code, in case client 
 developers want to do something with the extra information.

Actually, since we are here, I'll remove the error code from kafka.common and 
use the o.a.k.common.errors everywhere.
We are transitioning there anyway.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/#review71960
---


On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29647/
 ---
 
 (Updated Feb. 11, 2015, 1:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1697
 https://issues.apache.org/jira/browse/KAFKA-1697
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added early handling of invalid number of acks to handler and a test
 
 
 merging with current trunk
 
 
 moved check for legal requiredAcks to append and fixed the tests accordingly
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  a6107b818947d6d6818c85cdffcb2b13f69a55c0 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 a8deac4ce5149129d0a6f44c0526af9d55649a36 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 fb948b9ab28c516e81dab14dcbe211dcd99842b6 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 faa907131ed0aa94a7eacb78c1ffb576062be87a 
 
 Diff: https://reviews.apache.org/r/29647/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-02-11 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317335#comment-14317335
 ] 

Jay Kreps commented on KAFKA-1660:
--

Yeah I'm not proposing calling thread.stop(), we would stop the thread by 
sending it a message somehow to stop processing and then it exists without 
waiting for all messages to be sent. Basically the same way we implement 
close() without a timeout (which also doesn't call thread.stop).

 Ability to call close() with a timeout on the Java Kafka Producer. 
 ---

 Key: KAFKA-1660
 URL: https://issues.apache.org/jira/browse/KAFKA-1660
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2.0
Reporter: Andrew Stein
Assignee: Jun Rao
 Fix For: 0.8.3

 Attachments: KAFKA-1660.patch


 I would like the ability to call {{close}} with a timeout on the Java 
 Client's KafkaProducer.
 h6. Workaround
 Currently, it is possible to ensure that {{close}} will return quickly by 
 first doing a {{future.get(timeout)}} on the last future produced on each 
 partition, but this means that the user has to define the partitions up front 
 at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception

2015-02-11 Thread Jay Kreps
Yeah, hey Joel, that was a super long winded way of saying let's leave it
at Retriable. I agree there is another concept which is non duplicate
producing but when we do the idempotence stuff then all things will have
that property so it may be okay just to leave it for now since network
errors make it impossible to really do this properly without idempotence.

-Jay

On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Thanks for the comments - however, it is not clear to me what your
 preference is on making NotEnoughReplicasAfterAppend retriable vs
 non-retriable.

 As for me, my preference is to leave it as retriable since it is clear
 that the produce may succeed on a retry (and may introduce a
 duplicate). I agree that idempotence will bring full closure to this
 though.

 Anyone else have a preference on this?

 Thanks,

 Joel

 On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote:
  Yeah there are really two concepts here as I think you noted:
  1. Retry safe: we know that the write did not occur
  2. Retry fixable: if you send that again it could work
 
  (probably there are better names for these).
 
  Some things we know did not do a write and may be fixed by retrying (no
  leader). Some things we know didn't do a write and are not worth retrying
  (message too large). Somethings we don't know and are worth retrying
  (network error), and probably some things we don't know and aren't worth
 it
  (can't think of one though).
 
  (I feel like Donald Rumsfeld with the known unknowns thing).
 
  In the current world if you set retries  0 you are saying I accept
  duplicates but want to ensure my stuff gets written, if you set retries
 =
  0 you are saying I can't abide duplicates and am willing to tolerate
  loss. So Retryable for us means retry may succeed.
 
  Originally I thought of maybe trying to model both concepts. However the
  two arguments against it are:
  1. Even if you do this the guarantee remains at least once delivery
  because: (1) in the network error case you just don't know, (2) consumer
  failure.
  2. The proper fix for this is to add idempotence support on the server,
  which we should do.
 
  Doing idempotence support on the server will actually fix all duplicate
  problems, including the network error case (because of course the server
  knows whether your write went through even though the client doesn't).
 When
  we have that then the client can always just retry anything marked
  Retriable (i.e. retry may work) without fear of duplicates.
 
  This gives exactly once delivery to the log, and a co-operating consumer
  can use the offset to dedupe and get it end-to-end.
 
  So that was why I had just left one type of Retriable and used it to mean
  retry may work and don't try to flag anything for duplicates.
 
  -Jay
 
 
 
 
  On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
   Hi Kafka Devs,
  
   Need your thoughts on retriable exceptions:
  
   If a user configures Kafka with min.isr  1 and there are not enough
   replicas to safely store the data, there are two possibilities:
  
   1. The lack of replicas was discovered before the message was written.
 We
   throw NotEnoughReplicas.
   2. The lack of replicas was discovered after the message was written to
   leader. In this case, we throw  NotEnoughReplicasAfterAppend.
  
   Currently, both errors are Retriable. Which means that the new producer
   will retry multiple times.
   In case of the second exception, this will cause duplicates.
  
   KAFKA-1697 suggests:
   we probably want to make NotEnoughReplicasAfterAppend a non-retriable
   exception and let the client decide what to do.
  
   I agreed that the client (the one using the Producer) should weight the
   problems duplicates will cause vs. the probability of losing the
 message
   and do something sensible and made the exception non-retriable.
  
   In the RB (https://reviews.apache.org/r/29647/) Joel raised a good
 point:
   (Joel, feel free to correct me if I misrepresented your point)
  
   I think our interpretation of retriable is as follows (but we can
 discuss
   on the list if that needs to change): if the produce request hits an
 error,
   and there is absolutely no point in retrying then that is a
 non-retriable
   error. MessageSizeTooLarge is an example - since unless the producer
   changes the request to make the messages smaller there is no point in
   retrying.
  
   ...
   Duplicates can arise even for other errors (e.g., request timed out).
 So
   that side-effect is not compelling enough to warrant a change to make
 this
   non-retriable. 
  
   *(TL;DR;  )  Should exceptions where retries can cause duplicates
 should
   still be *
   *retriable?*
  
   Gwen
  




Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/
---

(Updated Feb. 12, 2015, 2:45 a.m.)


Review request for kafka.


Bugs: KAFKA-1697
https://issues.apache.org/jira/browse/KAFKA-1697


Repository: kafka


Description (updated)
---

added early handling of invalid number of acks to handler and a test


merging with current trunk


moved check for legal requiredAcks to append and fixed the tests accordingly


changing exception back to retriable


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
 a6107b818947d6d6818c85cdffcb2b13f69a55c0 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
a8deac4ce5149129d0a6f44c0526af9d55649a36 
  core/src/main/scala/kafka/cluster/Partition.scala 
e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
  core/src/main/scala/kafka/server/KafkaApis.scala 
6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
fb948b9ab28c516e81dab14dcbe211dcd99842b6 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
a1f72f8c2042ff2a43af503b2e06f84706dad9db 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
faa907131ed0aa94a7eacb78c1ffb576062be87a 

Diff: https://reviews.apache.org/r/29647/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker

2015-02-11 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1697:

Attachment: KAFKA-1697_2015-02-11_18:45:42.patch

 remove code related to ack1 on the broker
 --

 Key: KAFKA-1697
 URL: https://issues.apache.org/jira/browse/KAFKA-1697
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, 
 KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch


 We removed the ack1 support from the producer client in kafka-1555. We can 
 completely remove the code in the broker that supports ack1.
 Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
 exception and let the client decide what to do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1697) remove code related to ack1 on the broker

2015-02-11 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317471#comment-14317471
 ] 

Gwen Shapira commented on KAFKA-1697:
-

Updated reviewboard https://reviews.apache.org/r/29647/diff/
 against branch trunk

 remove code related to ack1 on the broker
 --

 Key: KAFKA-1697
 URL: https://issues.apache.org/jira/browse/KAFKA-1697
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, 
 KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch


 We removed the ack1 support from the producer client in kafka-1555. We can 
 completely remove the code in the broker that supports ack1.
 Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
 exception and let the client decide what to do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1947) can't explicitly set replica-assignment when add partitions

2015-02-11 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317513#comment-14317513
 ] 

Honghai Chen commented on KAFKA-1947:
-

After fix the command line, the command still not work, seemly more bugs in it. 
 Try below command
bin\kafka.cmd topiccmd --create --topic mvlogs --partition 1 
--replication-factor 2 --zookeeper localhost
bin\kafka.cmd topiccmd --alter --topic mvlogs  --partitions 2   
--replica-assignment  2:3   --zookeeper localhost

The log say:  
[2015-02-11 19:18:30,791] INFO zookeeper state changed (SyncConnected) (org.I0It
ec.zkclient.ZkClient)
WARNING: If partitions are increased for a topic that has a key, the partition l
ogic or ordering of the messages will be affected
[2015-02-11 19:18:31,427] INFO Add partition list for mvlogs is Map() (kafka.adm
in.AdminUtils$)
[2015-02-11 19:18:31,470] INFO Topic update 
{version:1,partitions:{0:[3,1]}} (kafka.admin.AdminUtils$)
Adding partitions succeeded!
[2015-02-11 19:18:31,479] INFO Terminate ZkClient event thread. (org.I0Itec.zkcl
ient.ZkEventThread)


Seemly the JSON string is wrong!
Will continue check it.



 can't explicitly set replica-assignment when add partitions
 ---

 Key: KAFKA-1947
 URL: https://issues.apache.org/jira/browse/KAFKA-1947
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: Honghai Chen

 When create topic, the replicaAssignmentOpt should not appear with partitions.
 But when add partitions,  they should can appear together,  from the code 
 below, you can see when alter topic, and has partitions in arguments, it try 
 get replica-assignment
 https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l114
  
 The root cause is below code:
 CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
  305 allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + 
 replicationFactorOpt)
 https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l304
 Related:  
 https://issues.apache.org/jira/browse/KAFKA-1052



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1947) can't explicitly set replica-assignment when add partitions

2015-02-11 Thread Honghai Chen (JIRA)
Honghai Chen created KAFKA-1947:
---

 Summary: can't explicitly set replica-assignment when add 
partitions
 Key: KAFKA-1947
 URL: https://issues.apache.org/jira/browse/KAFKA-1947
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: Honghai Chen


When create topic, the replicaAssignmentOpt should not appear with partitions.
But when add partitions,  they should can appear together,  from the code 
below, you can see when alter topic, and has partitions in arguments, it try 
get replica-assignment
https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l114
 

The root cause is below code:
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt,
 305 allTopicLevelOpts -- Set(alterOpt, createOpt) + partitionsOpt + 
replicationFactorOpt)

https://git1-us-west.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;h=285c0333ff43543d3e46444c1cd9374bb883bb59;hb=HEAD#l304

Related:  
https://issues.apache.org/jira/browse/KAFKA-1052





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version

2015-02-11 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317390#comment-14317390
 ] 

Gwen Shapira commented on KAFKA-1938:
-

+1 LGTM (non-binding)

 [doc] Quick start example should reference appropriate Kafka version
 

 Key: KAFKA-1938
 URL: https://issues.apache.org/jira/browse/KAFKA-1938
 Project: Kafka
  Issue Type: Improvement
  Components: website
Affects Versions: 0.8.2.0
Reporter: Stevo Slavic
Priority: Trivial
 Attachments: remove-081-references.patch


 Kafka 0.8.2.0 documentation, quick start example on 
 https://kafka.apache.org/documentation.html#quickstart in step 1 links and 
 instructs reader to download Kafka 0.8.1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28769: Patch for KAFKA-1809

2015-02-11 Thread Gwen Shapira


 On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote:
  Thanks for the new patch. Some more comments.
  
  1. We should think through whether we need to add security protocol to 
  existing tools like SimleConsumerShell and UpdateOffsetsInZk.
  2. There are unused imports.
  3. The patch needs rebase.
  4. The following unit test fails consistently.
  kafka.network.SocketServerTest  testSocketsCloseOnShutdown FAILED
  org.scalatest.junit.JUnitTestFailedError: expected exception when 
  writing to closed plain socket
  at 
  org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
  at 
  kafka.network.SocketServerTest.newAssertionFailedException(SocketServerTest.scala:37)
  at org.scalatest.Assertions$class.fail(Assertions.scala:711)
  at kafka.network.SocketServerTest.fail(SocketServerTest.scala:37)
  at 
  kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:162)

1. Since the tools go major rewrites now (SimpleConsumer may go away, 
ConsumerOffsetChecker went away...) and since some tools doesn't really need to 
support security protocols at all (I think? I definitely can't see why then 
need to support both SSL and Kerberos), I'd rather modify the tools as a 
separate patch once we have security implemented. The larger part of the change 
will be implementing SSL/Kerberos in the tools anyway...
3. I know :( chasing commits between reviews is the biggest pain here...
4. Will check it out.


 On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/client/ClientUtils.scala, line 111
  https://reviews.apache.org/r/28769/diff/17/?file=846148#file846148line111
 
  Should we default protocolType to PLAINTEXT?

I prefer not to default. Makes it more challenging to catch cases where the 
wrong protocol is used accidentaly.
Do you see a case for defaulting here?


 On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, lines 148-149
  https://reviews.apache.org/r/28769/diff/17/?file=846168#file846168line148
 
  Do we use PLAINTEXT://0.0.0.0:9092 to bind to all interface or 
  PLAINTEXT://:9092? If it's the former, does it work for ipv6 and do we need 
  to change getListeners() accordingly?

0.0.0.0 binds to all interfaces. Missing hostname binds to default interface.
Its pretty standard (i.e. matches the NIO bind behavior, and the one for lower 
level sockets too) and works the same way as it did before this patch.


 On Feb. 11, 2015, 9:45 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaConfig.scala, line 186
  https://reviews.apache.org/r/28769/diff/17/?file=846168#file846168line186
 
  This should default to 0.8.2.0. The user will upgrade the broker jar 
  with the default setting first, followed by another rolling bounce after 
  changing the config to 0.8.3.0.

Actually hardcoding the version means having to remember to change this with 
every release.
I'll try to figure something more reasonable for this.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/#review71916
---


On Feb. 3, 2015, 6:52 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28769/
 ---
 
 (Updated Feb. 3, 2015, 6:52 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1809
 https://issues.apache.org/jira/browse/KAFKA-1809
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed topicmetadata to include brokerendpoints and fixed few unit tests
 
 
 fixing systest and support for binding to default address
 
 
 fixed system tests
 
 
 fix default address binding and ipv6 support
 
 
 fix some issues regarding endpoint parsing. Also, larger segments for systest 
 make the validation much faster
 
 
 added link to security wiki in doc
 
 
 fixing unit test after rename of ProtocolType to SecurityProtocol
 
 
 Following Joe's advice, added security protocol enum on client side, and 
 modified protocol to use ID instead of string.
 
 
 validate producer config against enum
 
 
 add a second protocol for testing and modify SocketServerTests to check on 
 multi-ports
 
 
 Reverted the metadata request changes and removed the explicit security 
 protocol argument. Instead the socketserver will determine the protocol based 
 on the port and add this to the request
 
 
 bump version for UpdateMetadataRequest and added support for rolling upgrades 
 with new config
 
 
 following tests - fixed LeaderAndISR protocol and ZK registration for 
 backward compatibility
 
 
 cleaned up some changes that were actually not necessary. hopefully making 
 this patch slightly easier to review
 
 
 undoing 

Re: [DISCUSSION] KIP-2: Refactor Brokers to Allow Multiple Endpoints

2015-02-11 Thread Todd Palino
Thanks, Gwen. This looks good to me as far as the wire protocol versioning
goes. I agree with you on defaulting to the new wire protocol version for
new installs. I think it will also need to be very clear (to general
installer of Kafka, and not just developers) in documentation when the wire
protocol version changes moving forwards, and what the risk/benefit of
changing to the new version is.

Since a rolling upgrade of the intra-cluster protocol is supported, will a
rolling downgrade work as well? Should a flaw (bug, security, or otherwise)
be discovered after upgrade, is it possible to change the wire.protocol.version
back to 0.8.2 and do a rolling bounce?

On the host/port/protocol specification, specifically the ZK config format,
is it possible to have an un-advertised endpoint? I would see this as
potentially useful if you wanted to have an endpoint that you are reserving
for intra-cluster communication, and you would prefer to not have it
advertised at all. Perhaps it is blocked by a firewall rule or other
authentication method. This could also allow you to duplicate a security
protocol type but segregate it on a different port or interface (if it is
unadvertised, there is no ambiguity to the clients as to which endpoint
should be selected). I believe I asked about that previously, and I didn't
track what the final outcome was or even if it was discussed further.


-Todd


On Wed, Feb 11, 2015 at 4:38 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Added Jun's notes to the KIP (Thanks for explaining so clearly, Jun. I was
 clearly struggling with this...) and removed the reference to
 use.new.wire.protocol.

 On Wed, Feb 11, 2015 at 4:19 PM, Joel Koshy jjkosh...@gmail.com wrote:

  The description that Jun gave for (2) was the detail I was looking for
  - Gwen can you update the KIP with that for completeness/clarity?
 
  I'm +1 as well overall. However, I think it would be good if we also
  get an ack from someone who is more experienced on the operations side
  (say, Todd) to review especially the upgrade plan.
 
  On Wed, Feb 11, 2015 at 09:40:50AM -0800, Jun Rao wrote:
   +1 for proposed changes in 1 and 2.
  
   1. The impact is that if someone uses SimpleConsumer and references
  Broker
   explicitly, the application needs code change to compile with 0.8.3.
  Since
   SimpleConsumer is not widely used, breaking the API in SimpleConsumer
 but
   maintaining overall code cleanness seems to be a better tradeoff.
  
   2. For clarification, the issue is the following. In 0.8.3, we will be
   evolving the wire protocol of UpdateMedataRequest (to send info about
   endpoints for different security protocols). Since this is used in
   intra-cluster communication, we need to do the upgrade in two steps.
 The
   idea is that in 0.8.3, we will default wire.protocol.version to 0.8.2.
  When
   upgrading to 0.8.3, in step 1, we do a rolling upgrade to 0.8.3. After
  step
   1, all brokers will be capable for processing the new protocol in
 0.8.3,
   but without actually using it. In step 2, we
   configure wire.protocol.version to 0.8.3 in each broker and do another
   rolling restart. After step 2, all brokers will start using the new
   protocol in 0.8.3. Let's say that in the next release 0.9, we are
  changing
   the intra-cluster wire protocol again. We will do the similar thing:
   defaulting wire.protocol.version to 0.8.3 in 0.9 so that people can
  upgrade
   from 0.8.3 to 0.9 in two steps. For people who want to upgrade from
 0.8.2
   to 0.9 directly, they will have to configure wire.protocol.version to
  0.8.2
   first and then do the two-step upgrade to 0.9.
  
   Gwen,
  
   In KIP2, there is still a reference to use.new.protocol. This needs to
 be
   removed. Also, would it be better to use
  intra.cluster.wire.protocol.version
   since this only applies to the wire protocol among brokers?
  
   Others,
  
   The patch in KAFKA-1809 is almost ready. It would be good to wrap up
 the
   discussion on KIP2 soon. So, if you haven't looked at this KIP, please
  take
   a look and send your comments.
  
   Thanks,
  
   Jun
  
  
   On Mon, Jan 26, 2015 at 8:02 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
Hi Kafka Devs,
   
While reviewing the patch for KAFKA-1809, we came across two
 questions
that we are interested in hearing the community out on.
   
1. This patch changes the Broker class and adds a new class
BrokerEndPoint that behaves like the previous broker.
   
While technically kafka.cluster.Broker is not part of the public API,
it is returned by javaapi, used with the SimpleConsumer.
   
Getting replicas from PartitionMetadata will now return
 BrokerEndPoint
instead of Broker. All method calls remain the same, but since we
return a new type, we break the API.
   
Note that this breakage does not prevent upgrades - existing
SimpleConsumers will continue working (because we are
wire-compatible).
The only thing that won't work is 

Re: [KIP-DISCUSSION] Mirror Maker Enhancement

2015-02-11 Thread Neha Narkhede
Thanks for the explanation, Joel! Would love to see the results of the
throughput experiment and I'm a +1 on everything else, including the
rebalance callback and record handler.

-Neha

On Wed, Feb 11, 2015 at 1:13 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Cool, I agree with all that.

 I agree about the need for a rebalancing callback.

 Totally agree about record handler.

 It would be great to see if a prototype of this is workable.

 Thanks guys!

 -Jay

 On Wed, Feb 11, 2015 at 12:36 PM, Joel Koshy jjkosh...@gmail.com wrote:

  Hey Jay,
 
  Guozhang, Becket and I got together to discuss this and we think:
 
  - It seems that your proposal based on the new consumer and flush call
should work.
  - We would likely need to call the poll with a timeout that matches
the offset commit interval in order to deal with low volume
mirroring pipelines.
  - We will still need a rebalance callback to reduce duplicates - the
rebalance callback would need to flush and commit offsets.
  - The only remaining question is if the overall throughput is
sufficient. I think someone at LinkedIn (I don't remember who) did
some experiments with data channel size == 1 and ran into issues.
That was not thoroughly investigated though.
  - The addition of flush may actually make this solution viable for the
current mirror-maker (with the old consumer). We can prototype that
offline and if it works out well we can redo KAFKA-1650 (i.e.,
refactor the current mirror maker). The flush call and the new
consumer didn't exist at the time we did KAFKA-1650 so this did not
occur to us.
  - We think the RecordHandler is still a useful small addition for the
use-cases mentioned earlier in this thread.
 
  Thanks,
 
  Joel
 
  On Wed, Feb 11, 2015 at 09:05:39AM -0800, Jay Kreps wrote:
   Guozhang, I agree with 1-3, I do think what I was proposing was simpler
  but
   perhaps there are gaps in that?
  
   Hey Joel--Here was a sketch of what I was proposing. I do think this
  get's
   rid of manual offset tracking, especially doing so across threads with
   dedicated commit threads, which I think is pretty complex.
  
   while(true) {
   val recs = consumer.poll(Long.MaxValue);
   for (rec - recs)
   producer.send(rec, logErrorCallback)
   if(System.currentTimeMillis - lastCommit  commitInterval) {
   producer.flush()
   consumer.commit()
   lastCommit = System.currentTimeMillis
   }
   }
  
   (See the previous email for details). I think the question is: is there
  any
   reason--performance, correctness, etc--that this won't work? Basically
 I
   think you guys have thought about this more so I may be missing
  something.
   If so let's flag it while we still have leeway on the consumer.
  
   If we think that will work, well I do think it is conceptually a lot
   simpler than the current code, though I suppose one could disagree on
  that.
  
   -Jay
  
   On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
Hi Jay,
   
 The data channels are actually a big part of the complexity of the
  zero
 data loss design, though, right? Because then you need some reverse
channel
 to flow the acks back to the consumer based on where you are versus
  just
 acking what you have read and written (as in the code snippet I put
  up).
   
I'm not sure if we are on the same page. Even if the data channel was
not there the current handling for zero data loss would remain very
similar - you would need to maintain lists of unacked source offsets.
I'm wondering if the KIP needs more detail on how it is currently
implemented; or are suggesting a different approach (in which case I
have not fully understood). I'm not sure what you mean by flowing
 acks
back to the consumer - the MM commits offsets after the producer ack
has been received. There is some additional complexity introduced in
reducing duplicates on a rebalance - this is actually optional (since
duplicates are currently a given). The reason that was done anyway is
that with the auto-commit turned off duplicates are almost guaranteed
on a rebalance.
   
 I think the point that Neha and I were trying to make was that the
 motivation to embed stuff into MM kind of is related to how
 complex a
 simple consume and produce with good throughput will be. If it is
simple
 to write such a thing in a few lines, the pain of embedding a bunch
  of
 stuff won't be worth it, if it has to be as complex as the current
 mm
then
 of course we will need all kinds of plug ins because no one will be
  able
to
 write such a thing. I don't have a huge concern with a simple
 plug-in
but I
 think if it turns into something more complex with filtering and
 aggregation or whatever we really need to stop and think a bit
 about
  the
 design.
   
I agree - I don't think there is a 

[jira] [Updated] (KAFKA-1697) remove code related to ack1 on the broker

2015-02-11 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1697:

Attachment: KAFKA-1697_2015-02-11_18:47:53.patch

 remove code related to ack1 on the broker
 --

 Key: KAFKA-1697
 URL: https://issues.apache.org/jira/browse/KAFKA-1697
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, 
 KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch, 
 KAFKA-1697_2015-02-11_18:47:53.patch


 We removed the ack1 support from the producer client in kafka-1555. We can 
 completely remove the code in the broker that supports ack1.
 Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
 exception and let the client decide what to do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/
---

(Updated Feb. 12, 2015, 2:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1697
https://issues.apache.org/jira/browse/KAFKA-1697


Repository: kafka


Description (updated)
---

added early handling of invalid number of acks to handler and a test


merging with current trunk


moved check for legal requiredAcks to append and fixed the tests accordingly


changing exception back to retriable


cleaning unused exceptions


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
 a6107b818947d6d6818c85cdffcb2b13f69a55c0 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
a8deac4ce5149129d0a6f44c0526af9d55649a36 
  core/src/main/scala/kafka/cluster/Partition.scala 
e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
  core/src/main/scala/kafka/server/KafkaApis.scala 
6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
fb948b9ab28c516e81dab14dcbe211dcd99842b6 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
a1f72f8c2042ff2a43af503b2e06f84706dad9db 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
faa907131ed0aa94a7eacb78c1ffb576062be87a 

Diff: https://reviews.apache.org/r/29647/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1697) remove code related to ack1 on the broker

2015-02-11 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317475#comment-14317475
 ] 

Gwen Shapira commented on KAFKA-1697:
-

Updated reviewboard https://reviews.apache.org/r/29647/diff/
 against branch trunk

 remove code related to ack1 on the broker
 --

 Key: KAFKA-1697
 URL: https://issues.apache.org/jira/browse/KAFKA-1697
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-1697.patch, KAFKA-1697_2015-01-14_15:41:37.patch, 
 KAFKA-1697_2015-02-10_17:06:51.patch, KAFKA-1697_2015-02-11_18:45:42.patch, 
 KAFKA-1697_2015-02-11_18:47:53.patch


 We removed the ack1 support from the producer client in kafka-1555. We can 
 completely remove the code in the broker that supports ack1.
 Also, we probably want to make NotEnoughReplicasAfterAppend a non-retriable 
 exception and let the client decide what to do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira


 On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote:
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala, line 96
  https://reviews.apache.org/r/29647/diff/3/?file=860605#file860605line96
 
  Do we need this here?

IMO, its a good idea to have this in any test that starts new threads - to 
verify that we close them.
We have this in a bunch of places.


 On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote:
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala, line 83
  https://reviews.apache.org/r/29647/diff/3/?file=860605#file860605line83
 
  Can we just do the assert here instead of throwing an exception?
  
  i.e., `assertEquals(responseStatus.values.size, 
  responseStatus.values.count(_.error == INVALID_REQUIRED_ACKS))`
  
  Either way is fine. Whichever is clearer although the above may be more 
  concise if it works.

good idea.


 On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java,
   line 23
  https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23
 
  Pending discussion.

changing back to retriable, per discussion in mailing list. 
I'm leaving this as a separate exception and error code, in case client 
developers want to do something with the extra information.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/#review71960
---


On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29647/
 ---
 
 (Updated Feb. 11, 2015, 1:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1697
 https://issues.apache.org/jira/browse/KAFKA-1697
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added early handling of invalid number of acks to handler and a test
 
 
 merging with current trunk
 
 
 moved check for legal requiredAcks to append and fixed the tests accordingly
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  a6107b818947d6d6818c85cdffcb2b13f69a55c0 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 a8deac4ce5149129d0a6f44c0526af9d55649a36 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 fb948b9ab28c516e81dab14dcbe211dcd99842b6 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 faa907131ed0aa94a7eacb78c1ffb576062be87a 
 
 Diff: https://reviews.apache.org/r/29647/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception

2015-02-11 Thread Neha Narkhede

 As for me, my preference is to leave it as retriable since it is clear
 that the produce may succeed on a retry (and may introduce a
 duplicate). I agree that idempotence will bring full closure to this
 though.


+1

On Wed, Feb 11, 2015 at 5:24 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Yeah, hey Joel, that was a super long winded way of saying let's leave it
 at Retriable. I agree there is another concept which is non duplicate
 producing but when we do the idempotence stuff then all things will have
 that property so it may be okay just to leave it for now since network
 errors make it impossible to really do this properly without idempotence.

 -Jay

 On Wed, Feb 11, 2015 at 5:41 AM, Joel Koshy jjkosh...@gmail.com wrote:

  Thanks for the comments - however, it is not clear to me what your
  preference is on making NotEnoughReplicasAfterAppend retriable vs
  non-retriable.
 
  As for me, my preference is to leave it as retriable since it is clear
  that the produce may succeed on a retry (and may introduce a
  duplicate). I agree that idempotence will bring full closure to this
  though.
 
  Anyone else have a preference on this?
 
  Thanks,
 
  Joel
 
  On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote:
   Yeah there are really two concepts here as I think you noted:
   1. Retry safe: we know that the write did not occur
   2. Retry fixable: if you send that again it could work
  
   (probably there are better names for these).
  
   Some things we know did not do a write and may be fixed by retrying (no
   leader). Some things we know didn't do a write and are not worth
 retrying
   (message too large). Somethings we don't know and are worth retrying
   (network error), and probably some things we don't know and aren't
 worth
  it
   (can't think of one though).
  
   (I feel like Donald Rumsfeld with the known unknowns thing).
  
   In the current world if you set retries  0 you are saying I accept
   duplicates but want to ensure my stuff gets written, if you set
 retries
  =
   0 you are saying I can't abide duplicates and am willing to tolerate
   loss. So Retryable for us means retry may succeed.
  
   Originally I thought of maybe trying to model both concepts. However
 the
   two arguments against it are:
   1. Even if you do this the guarantee remains at least once delivery
   because: (1) in the network error case you just don't know, (2)
 consumer
   failure.
   2. The proper fix for this is to add idempotence support on the server,
   which we should do.
  
   Doing idempotence support on the server will actually fix all duplicate
   problems, including the network error case (because of course the
 server
   knows whether your write went through even though the client doesn't).
  When
   we have that then the client can always just retry anything marked
   Retriable (i.e. retry may work) without fear of duplicates.
  
   This gives exactly once delivery to the log, and a co-operating
 consumer
   can use the offset to dedupe and get it end-to-end.
  
   So that was why I had just left one type of Retriable and used it to
 mean
   retry may work and don't try to flag anything for duplicates.
  
   -Jay
  
  
  
  
   On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
  
Hi Kafka Devs,
   
Need your thoughts on retriable exceptions:
   
If a user configures Kafka with min.isr  1 and there are not enough
replicas to safely store the data, there are two possibilities:
   
1. The lack of replicas was discovered before the message was
 written.
  We
throw NotEnoughReplicas.
2. The lack of replicas was discovered after the message was written
 to
leader. In this case, we throw  NotEnoughReplicasAfterAppend.
   
Currently, both errors are Retriable. Which means that the new
 producer
will retry multiple times.
In case of the second exception, this will cause duplicates.
   
KAFKA-1697 suggests:
we probably want to make NotEnoughReplicasAfterAppend a
 non-retriable
exception and let the client decide what to do.
   
I agreed that the client (the one using the Producer) should weight
 the
problems duplicates will cause vs. the probability of losing the
  message
and do something sensible and made the exception non-retriable.
   
In the RB (https://reviews.apache.org/r/29647/) Joel raised a good
  point:
(Joel, feel free to correct me if I misrepresented your point)
   
I think our interpretation of retriable is as follows (but we can
  discuss
on the list if that needs to change): if the produce request hits an
  error,
and there is absolutely no point in retrying then that is a
  non-retriable
error. MessageSizeTooLarge is an example - since unless the producer
changes the request to make the messages smaller there is no point in
retrying.
   
...
Duplicates can arise even for other errors (e.g., request timed out).
  So
that side-effect is not 

[jira] [Updated] (KAFKA-1938) [doc] Quick start example should reference appropriate Kafka version

2015-02-11 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1938:

Assignee: Manikumar Reddy

 [doc] Quick start example should reference appropriate Kafka version
 

 Key: KAFKA-1938
 URL: https://issues.apache.org/jira/browse/KAFKA-1938
 Project: Kafka
  Issue Type: Improvement
  Components: website
Affects Versions: 0.8.2.0
Reporter: Stevo Slavic
Assignee: Manikumar Reddy
Priority: Trivial
 Attachments: remove-081-references.patch


 Kafka 0.8.2.0 documentation, quick start example on 
 https://kafka.apache.org/documentation.html#quickstart in step 1 links and 
 instructs reader to download Kafka 0.8.1.1.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira


 On Feb. 11, 2015, 1:48 p.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java,
   line 23
  https://reviews.apache.org/r/29647/diff/3/?file=860600#file860600line23
 
  Pending discussion.
 
 Gwen Shapira wrote:
 changing back to retriable, per discussion in mailing list. 
 I'm leaving this as a separate exception and error code, in case client 
 developers want to do something with the extra information.
 
 Gwen Shapira wrote:
 Actually, since we are here, I'll remove the error code from kafka.common 
 and use the o.a.k.common.errors everywhere.
 We are transitioning there anyway.

Never mind on the last comment. I have unit tests for NotEnoughReplicas that 
use the scala producer and that requires the kafka.common error.
I'll let Jeff figure out how to disentangle the Scala Producer from the 
existing scala error codes. Its way beyond scope here :)


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/#review71960
---


On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29647/
 ---
 
 (Updated Feb. 11, 2015, 1:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1697
 https://issues.apache.org/jira/browse/KAFKA-1697
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added early handling of invalid number of acks to handler and a test
 
 
 merging with current trunk
 
 
 moved check for legal requiredAcks to append and fixed the tests accordingly
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  a6107b818947d6d6818c85cdffcb2b13f69a55c0 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 a8deac4ce5149129d0a6f44c0526af9d55649a36 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 fb948b9ab28c516e81dab14dcbe211dcd99842b6 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 faa907131ed0aa94a7eacb78c1ffb576062be87a 
 
 Diff: https://reviews.apache.org/r/29647/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-1948) kafka.api.consumerTests are hanging

2015-02-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317722#comment-14317722
 ] 

Guozhang Wang commented on KAFKA-1948:
--

Gwen,

Just ran locally a few times and I do see this issue coming, will take a look 
at this asap.

 kafka.api.consumerTests are hanging
 ---

 Key: KAFKA-1948
 URL: https://issues.apache.org/jira/browse/KAFKA-1948
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira

 Noticed today that very often when I run the full test suite, it hangs on 
 kafka.api.consumerTest (not always same test though). It doesn't reproduce 
 100% of the time, but enough to be very annoying.
 I also saw it happening on trunk after KAFKA-1333:
 https://builds.apache.org/view/All/job/Kafka-trunk/389/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1948) kafka.api.consumerTests are hanging

2015-02-11 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317713#comment-14317713
 ] 

Gwen Shapira commented on KAFKA-1948:
-

[~guozhang] [~onurkaraman] - since you worked on KAFKA-1333, can you check if 
the hangs are related?

 kafka.api.consumerTests are hanging
 ---

 Key: KAFKA-1948
 URL: https://issues.apache.org/jira/browse/KAFKA-1948
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira

 Noticed today that very often when I run the full test suite, it hangs on 
 kafka.api.consumerTest (not always same test though). It doesn't reproduce 
 100% of the time, but enough to be very annoying.
 I also saw it happening on trunk after KAFKA-1333:
 https://builds.apache.org/view/All/job/Kafka-trunk/389/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1948) kafka.api.consumerTests are hanging

2015-02-11 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-1948:
---

 Summary: kafka.api.consumerTests are hanging
 Key: KAFKA-1948
 URL: https://issues.apache.org/jira/browse/KAFKA-1948
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira


Noticed today that very often when I run the full test suite, it hangs on 
kafka.api.consumerTest (not always same test though). It doesn't reproduce 100% 
of the time, but enough to be very annoying.

I also saw it happening on trunk after KAFKA-1333:
https://builds.apache.org/view/All/job/Kafka-trunk/389/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1887) controller error message on shutting the last broker

2015-02-11 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14317695#comment-14317695
 ] 

Gwen Shapira commented on KAFKA-1887:
-

[~harsha_ch] I think the idea was simply to move kafkaHealthCheck.shutdown() to 
after kafkaController.shutdown() in the broker shutdown sequence. This happens 
after the ControlShutdownRequest, so it should be ok, right?


 controller error message on shutting the last broker
 

 Key: KAFKA-1887
 URL: https://issues.apache.org/jira/browse/KAFKA-1887
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Sriharsha Chintalapani
Priority: Minor
 Fix For: 0.8.3


 We always see the following error in state-change log on shutting down the 
 last broker.
 [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change 
 for partition [test,0] from OfflinePartition to OnlinePartition failed 
 (state.change.logger)
 kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
 alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
 at 
 kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
 at 
 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357)
 at 
 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206)
 at 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
 at 
 kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
 at 
 kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at 
 kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: KafkaPreCommit #3

2015-02-11 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/3/

--
[...truncated 901 lines...]
kafka.log.LogManagerTest  testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest  testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest  testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[19] PASSED

kafka.log.CleanerTest  testCleanSegments PASSED

kafka.log.CleanerTest  testCleaningWithDeletes PASSED

kafka.log.CleanerTest  testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest  testSegmentGrouping PASSED

kafka.log.CleanerTest  testBuildOffsetMap PASSED

kafka.log.OffsetMapTest  testBasicValidation PASSED

kafka.log.OffsetMapTest  testClear PASSED

kafka.log.FileMessageSetTest  testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest  testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest  testSizeInBytes PASSED

kafka.log.FileMessageSetTest  testWriteTo PASSED

kafka.log.FileMessageSetTest  testFileSize PASSED

kafka.log.FileMessageSetTest  testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest  testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest  testRead PASSED

kafka.log.FileMessageSetTest  testSearch PASSED

kafka.log.FileMessageSetTest  testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest  testTruncate PASSED

kafka.log.LogCleanerIntegrationTest  cleanerTest PASSED

kafka.log.LogSegmentTest  testTruncate PASSED

kafka.log.LogSegmentTest  testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest  testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest  testMaxOffset PASSED

kafka.log.LogSegmentTest  testReadAfterLast PASSED

kafka.log.LogSegmentTest  testReadFromGap PASSED

kafka.log.LogSegmentTest  testTruncateFull PASSED

kafka.log.LogSegmentTest  testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest  testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest  testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest  testRecoveryWithCorruptMessage PASSED

kafka.producer.SyncProducerTest  testReachableServer PASSED

kafka.producer.SyncProducerTest  testEmptyProduceRequest PASSED

kafka.producer.SyncProducerTest  testMessageSizeTooLarge PASSED

kafka.producer.SyncProducerTest  testMessageSizeTooLargeWithAckZero PASSED

kafka.producer.SyncProducerTest  testProduceCorrectlyReceivesResponse PASSED

kafka.producer.SyncProducerTest  testProducerCanTimeout PASSED

kafka.producer.SyncProducerTest  testProduceRequestWithNoResponse PASSED

kafka.producer.SyncProducerTest  testNotEnoughReplicas PASSED

kafka.producer.AsyncProducerTest  testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest  testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest  testBatchSize PASSED

kafka.producer.AsyncProducerTest  testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest  testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest  testSerializeEvents PASSED

kafka.producer.AsyncProducerTest  testInvalidPartition PASSED

kafka.producer.AsyncProducerTest  testNoBroker PASSED

kafka.producer.AsyncProducerTest  testIncompatibleEncoder PASSED

kafka.producer.AsyncProducerTest  testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest  testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest  testJavaProducer PASSED

kafka.producer.AsyncProducerTest  testInvalidConfiguration PASSED

kafka.producer.ProducerTest  testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest  testSendToNewTopic PASSED

kafka.producer.ProducerTest  testSendWithDeadBroker PASSED


Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Gwen Shapira


 On Feb. 12, 2015, 4:20 a.m., Joe Stein wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, line 307
  https://reviews.apache.org/r/29647/diff/5/?file=861501#file861501line307
 
  Could we change this to a match case? 
  requiredAcks match {
  case 0 = {}
  case 1 = {}
  case -1 = {}
  case _ = {}
  }
  make it more concise to read through and scalish

unfortunately, I couldn't use a switch on requiredAcks. There's logic for 
whether we need to wait for replication based on both requiredAcks and other 
factors, and splitting this logic to multiple switches / conditions makes 
things less readable.

I do agree that it was challenging to read, so I did some refactoring. I hope 
the new version is clearer, even if not very scalish.


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/#review72107
---


On Feb. 12, 2015, 7:14 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29647/
 ---
 
 (Updated Feb. 12, 2015, 7:14 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1697
 https://issues.apache.org/jira/browse/KAFKA-1697
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added early handling of invalid number of acks to handler and a test
 
 
 merging with current trunk
 
 
 moved check for legal requiredAcks to append and fixed the tests accordingly
 
 
 changing exception back to retriable
 
 
 cleaning unused exceptions
 
 
 refactored appendToLog for clarity
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  a6107b818947d6d6818c85cdffcb2b13f69a55c0 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 a8deac4ce5149129d0a6f44c0526af9d55649a36 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 fb948b9ab28c516e81dab14dcbe211dcd99842b6 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 a1f72f8c2042ff2a43af503b2e06f84706dad9db 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 faa907131ed0aa94a7eacb78c1ffb576062be87a 
 
 Diff: https://reviews.apache.org/r/29647/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Updated] (KAFKA-1945) MetaData Response - Broker hostname is wrong

2015-02-11 Thread saravana kumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

saravana kumar updated KAFKA-1945:
--
Description: 
I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. Kafka 
broker is running on a machine with its hostname as BROKER_HOST. Now, 
SimpleConsumer from another machine requests for topic metadata from the broker 
BROKER_HOST for a topic TOPIC  gets a python tuple

   (Broker metadata, Topic metadata)
Broker metadata comes as,

 {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)}

ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty 
confirms it) but it comes as localhost...

How does the wrong broker metadata for a topic get into kafka system? And 
obviously, this breaks the system since my consumer tries to connect to 9092 on 
its localhost.


 MetaData Response - Broker hostname is wrong
 

 Key: KAFKA-1945
 URL: https://issues.apache.org/jira/browse/KAFKA-1945
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: saravana kumar

 I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. 
 Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, 
 SimpleConsumer from another machine requests for topic metadata from the 
 broker BROKER_HOST for a topic TOPIC  gets a python tuple
(Broker metadata, Topic metadata)
 Broker metadata comes as,
  {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)}
 ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty 
 confirms it) but it comes as localhost...
 How does the wrong broker metadata for a topic get into kafka system? And 
 obviously, this breaks the system since my consumer tries to connect to 9092 
 on its localhost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka New(Java) Producer Connection reset by peer error and LB

2015-02-11 Thread Ewen Cheslack-Postava
Agree with Jay. It's unfortunate that this gets logged because in this case
it's just noise, but this is an exception that can happen both in
potentially bad cases (remote peer closed connection forcibly with
outstanding unprocessed data) or in normal cases that aren't problematic
(TCP connection timeout). I'm pretty sure some load balancers, e.g.
HAProxy, disable socket lingering to avoid time-wait (i.e. they send a RST
even when they could use a FIN), which helps them avoid socket starvation.

I think the generalized bug this is an instance of is that we're relying on
timeouts in lower layers, like TCP timeouts, to clean up after us. Ideally
anything that might trigger a timeout in a lower layer could, with the
correct settings, be caught and cleaned up earlier by Kafka. This means
adding timeouts on resources managed by Kafka, such as TCP connections to
brokers as KAFKA-1941 suggests.



On Tue, Feb 10, 2015 at 1:45 PM, Jay Kreps jay.kr...@gmail.com wrote:

 I don't think this is a bug. Currently we don't support timing out
 connections in the clients, which would be a good feature to add. As a
 result the connection remains until the LB kills it. When that happens you
 get a message logged that the connection was unexpectedly closed, which I
 think is what should happen (you can disable the logging in log4j if you
 don't want it).

 It would be nice to implement a client-side connection LRU for unused
 connections. I filed a ticket to track this:
 https://issues.apache.org/jira/browse/KAFKA-1941

 -Jay

 On Tue, Feb 10, 2015 at 11:33 AM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
  wrote:

  HI Ewen,
 
  The root of the problem is leak of TCP connection which idle for while.
 It
  is just a log message as  you mentioned, but suppose you have 50 or more
  producer instances created by application and everyone of then will print
  above log that becomes little concern.
 
  We configured producer with bootstrap list as  LB:port1 and it is set
 to
  TCP port forward to broker:port2.  When producer fetches Cluster
 Metadata
  and  discovers that TCP connection LB:port1 is not part of  broker
  cluster or topology, it should close connection to LB:port1 (In my
  opinion, this would be expected behavior).
 
  As you mentioned, producer behavior is normal and this error is harmless.
  If you consider this as a bug,  please let me know and I will file jira
  ticket for this.
 
  We are on non-release 0.8.2 from trunk.
 
  Thanks,
 
  Bhavesh
 
 
 
  Thanks,
 
  Bhavesh
 
  On Tue, Feb 10, 2015 at 12:29 AM, Ewen Cheslack-Postava 
 e...@confluent.io
  
  wrote:
 
   Bhavesh,
  
   I'm unclear what the impact is here. The line numbers don't match up
   exactly with trunk or 0.8.2.0, but it looks like this exception is just
   caught and logged. As far as I can tell the producer would continue to
   function normally. Does this have any impact on the producer or is the
   concern just that the exception is being logged?
  
   On Mon, Feb 9, 2015 at 11:21 PM, Bhavesh Mistry 
   mistry.p.bhav...@gmail.com
   wrote:
  
HI Kafka Team,
   
Please confirm if you would like to open Jira issue to track this ?
   
Thanks,
   
Bhavesh
   
On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry 
mistry.p.bhav...@gmail.com
wrote:
   
 Hi Kakfa Team,

 We are getting this connection reset by pears after couple of
 minute
aster
 start-up of producer due to infrastructure deployment strategies we
   have
 adopted from LinkedIn.

 We have LB hostname and port as seed server, and all producers are
getting
 following exception because of TCP idle connection timeout set on
 LB
(which
 is 2 minutes and Kafka TCP connection idle is set to 10 minutes).
This
 seems to be  minor bug to close TCP connection after discovering
 that
seed
 server is not part of brokers list immediately.


 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
 at sun.nio.ch.IOUtil.read(IOUtil.java:171)
 at
 sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
 at

   
  
 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at
  org.apache.kafka.common.network.Selector.poll(Selector.java:242)
 at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
 at

  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at

  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:662)
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
 at 

Re: Review Request 28769: Patch for KAFKA-1809

2015-02-11 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/#review71916
---


Thanks for the new patch. Some more comments.

1. We should think through whether we need to add security protocol to existing 
tools like SimleConsumerShell and UpdateOffsetsInZk.
2. There are unused imports.
3. The patch needs rebase.
4. The following unit test fails consistently.
kafka.network.SocketServerTest  testSocketsCloseOnShutdown FAILED
org.scalatest.junit.JUnitTestFailedError: expected exception when writing 
to closed plain socket
at 
org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
at 
kafka.network.SocketServerTest.newAssertionFailedException(SocketServerTest.scala:37)
at org.scalatest.Assertions$class.fail(Assertions.scala:711)
at kafka.network.SocketServerTest.fail(SocketServerTest.scala:37)
at 
kafka.network.SocketServerTest.testSocketsCloseOnShutdown(SocketServerTest.scala:162)


clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
https://reviews.apache.org/r/28769/#comment117835

Do we need to remove final?



core/src/main/scala/kafka/client/ClientUtils.scala
https://reviews.apache.org/r/28769/#comment117853

Should we default protocolType to PLAINTEXT?



core/src/main/scala/kafka/cluster/Broker.scala
https://reviews.apache.org/r/28769/#comment117854

Instead of representing endpoints as a single string, should we represent 
it as an array of strings, one for each endpoint?



core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
https://reviews.apache.org/r/28769/#comment117855

For ease of reading, perhaps we can add a comment on what the uri format is?



core/src/main/scala/kafka/cluster/BrokerEndPoint.scala
https://reviews.apache.org/r/28769/#comment117856

Capitalize the first word of each sentence and end the sentence with . This 
applies to a few other places too.



core/src/main/scala/kafka/consumer/ConsumerConfig.scala
https://reviews.apache.org/r/28769/#comment117857

I thought we won't support security in the old client?



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/28769/#comment117861

If host.name is not specified, should we generate PLAINTEXT://:6667 
instead? When converting that to a BrokerEndPoint, do we get a null for host? 
It will be useful to add a test for that.



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/28769/#comment117863

Same comment as above, does a null string translate into a null in 
endPoint.host?



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/28769/#comment117862

Do we use PLAINTEXT://0.0.0.0:9092 to bind to all interface or 
PLAINTEXT://:9092? If it's the former, does it work for ipv6 and do we need to 
change getListeners() accordingly?



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/28769/#comment117859

Would it be better to make this a sequence of string (like logDirs)?



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/28769/#comment117858

This should default to 0.8.2.0. The user will upgrade the broker jar with 
the default setting first, followed by another rolling bounce after changing 
the config to 0.8.3.0.


- Jun Rao


On Feb. 3, 2015, 6:52 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28769/
 ---
 
 (Updated Feb. 3, 2015, 6:52 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1809
 https://issues.apache.org/jira/browse/KAFKA-1809
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed topicmetadata to include brokerendpoints and fixed few unit tests
 
 
 fixing systest and support for binding to default address
 
 
 fixed system tests
 
 
 fix default address binding and ipv6 support
 
 
 fix some issues regarding endpoint parsing. Also, larger segments for systest 
 make the validation much faster
 
 
 added link to security wiki in doc
 
 
 fixing unit test after rename of ProtocolType to SecurityProtocol
 
 
 Following Joe's advice, added security protocol enum on client side, and 
 modified protocol to use ID instead of string.
 
 
 validate producer config against enum
 
 
 add a second protocol for testing and modify SocketServerTests to check on 
 multi-ports
 
 
 Reverted the metadata request changes and removed the explicit security 
 protocol argument. Instead the socketserver will determine the protocol based 
 on the port and add this to the request
 
 
 bump version for UpdateMetadataRequest and added support for rolling upgrades 
 with new config
 
 
 

[jira] [Resolved] (KAFKA-1377) transient unit test failure in LogOffsetTest

2015-02-11 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy resolved KAFKA-1377.

Resolution: Fixed

now i am not getting this exception..so closing the issue.

 transient unit test failure in LogOffsetTest
 

 Key: KAFKA-1377
 URL: https://issues.apache.org/jira/browse/KAFKA-1377
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Jun Rao
Assignee: Jun Rao
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1377.patch, KAFKA-1377_2014-04-11_17:42:13.patch, 
 KAFKA-1377_2014-04-11_18:14:45.patch


 Saw the following transient unit test failure.
 kafka.server.LogOffsetTest  testGetOffsetsBeforeEarliestTime FAILED
 junit.framework.AssertionFailedError: expected:List(0) but 
 was:Vector()
 at junit.framework.Assert.fail(Assert.java:47)
 at junit.framework.Assert.failNotEquals(Assert.java:277)
 at junit.framework.Assert.assertEquals(Assert.java:64)
 at junit.framework.Assert.assertEquals(Assert.java:71)
 at 
 kafka.server.LogOffsetTest.testGetOffsetsBeforeEarliestTime(LogOffsetTest.scala:198)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1945) MetaData Response - Broker hostname is wrong

2015-02-11 Thread saravana kumar (JIRA)
saravana kumar created KAFKA-1945:
-

 Summary: MetaData Response - Broker hostname is wrong
 Key: KAFKA-1945
 URL: https://issues.apache.org/jira/browse/KAFKA-1945
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: saravana kumar






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1945) MetaData Response - Broker hostname is wrong

2015-02-11 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14315841#comment-14315841
 ] 

Manikumar Reddy commented on KAFKA-1945:


You can set host.name , advertised.host.name  server config properties to 
bind a particular host name.
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan'tmyconsumers/producersconnecttothebrokers?

Also pl post such kind of questions to mailing list 
(https://kafka.apache.org/contact.html)

 MetaData Response - Broker hostname is wrong
 

 Key: KAFKA-1945
 URL: https://issues.apache.org/jira/browse/KAFKA-1945
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: saravana kumar

 I use python-kafka's SimpleConsumer to listen to a topic in kafka broker. 
 Kafka broker is running on a machine with its hostname as BROKER_HOST. Now, 
 SimpleConsumer from another machine requests for topic metadata from the 
 broker BROKER_HOST for a topic TOPIC  gets a python tuple
(Broker metadata, Topic metadata)
 Broker metadata comes as,
  {0: BrokerMetadata(nodeId=0, host='localhost', port=9092)}
 ideally, host value must be BROKER_HOST(hostname cmd from broker shell tty 
 confirms it) but it comes as localhost...
 How does the wrong broker metadata for a topic get into kafka system? And 
 obviously, this breaks the system since my consumer tries to connect to 9092 
 on its localhost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KAFKA-1697 - make NotEnoughReplicasAfterAppend a non-retriable exception

2015-02-11 Thread Joel Koshy
Thanks for the comments - however, it is not clear to me what your
preference is on making NotEnoughReplicasAfterAppend retriable vs
non-retriable.

As for me, my preference is to leave it as retriable since it is clear
that the produce may succeed on a retry (and may introduce a
duplicate). I agree that idempotence will bring full closure to this
though.

Anyone else have a preference on this?

Thanks,

Joel

On Tue, Feb 10, 2015 at 08:23:08PM -0800, Jay Kreps wrote:
 Yeah there are really two concepts here as I think you noted:
 1. Retry safe: we know that the write did not occur
 2. Retry fixable: if you send that again it could work
 
 (probably there are better names for these).
 
 Some things we know did not do a write and may be fixed by retrying (no
 leader). Some things we know didn't do a write and are not worth retrying
 (message too large). Somethings we don't know and are worth retrying
 (network error), and probably some things we don't know and aren't worth it
 (can't think of one though).
 
 (I feel like Donald Rumsfeld with the known unknowns thing).
 
 In the current world if you set retries  0 you are saying I accept
 duplicates but want to ensure my stuff gets written, if you set retries =
 0 you are saying I can't abide duplicates and am willing to tolerate
 loss. So Retryable for us means retry may succeed.
 
 Originally I thought of maybe trying to model both concepts. However the
 two arguments against it are:
 1. Even if you do this the guarantee remains at least once delivery
 because: (1) in the network error case you just don't know, (2) consumer
 failure.
 2. The proper fix for this is to add idempotence support on the server,
 which we should do.
 
 Doing idempotence support on the server will actually fix all duplicate
 problems, including the network error case (because of course the server
 knows whether your write went through even though the client doesn't). When
 we have that then the client can always just retry anything marked
 Retriable (i.e. retry may work) without fear of duplicates.
 
 This gives exactly once delivery to the log, and a co-operating consumer
 can use the offset to dedupe and get it end-to-end.
 
 So that was why I had just left one type of Retriable and used it to mean
 retry may work and don't try to flag anything for duplicates.
 
 -Jay
 
 
 
 
 On Tue, Feb 10, 2015 at 4:32 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  Hi Kafka Devs,
 
  Need your thoughts on retriable exceptions:
 
  If a user configures Kafka with min.isr  1 and there are not enough
  replicas to safely store the data, there are two possibilities:
 
  1. The lack of replicas was discovered before the message was written. We
  throw NotEnoughReplicas.
  2. The lack of replicas was discovered after the message was written to
  leader. In this case, we throw  NotEnoughReplicasAfterAppend.
 
  Currently, both errors are Retriable. Which means that the new producer
  will retry multiple times.
  In case of the second exception, this will cause duplicates.
 
  KAFKA-1697 suggests:
  we probably want to make NotEnoughReplicasAfterAppend a non-retriable
  exception and let the client decide what to do.
 
  I agreed that the client (the one using the Producer) should weight the
  problems duplicates will cause vs. the probability of losing the message
  and do something sensible and made the exception non-retriable.
 
  In the RB (https://reviews.apache.org/r/29647/) Joel raised a good point:
  (Joel, feel free to correct me if I misrepresented your point)
 
  I think our interpretation of retriable is as follows (but we can discuss
  on the list if that needs to change): if the produce request hits an error,
  and there is absolutely no point in retrying then that is a non-retriable
  error. MessageSizeTooLarge is an example - since unless the producer
  changes the request to make the messages smaller there is no point in
  retrying.
 
  ...
  Duplicates can arise even for other errors (e.g., request timed out). So
  that side-effect is not compelling enough to warrant a change to make this
  non-retriable. 
 
  *(TL;DR;  )  Should exceptions where retries can cause duplicates should
  still be *
  *retriable?*
 
  Gwen
 



Re: Review Request 29647: Patch for KAFKA-1697

2015-02-11 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29647/#review71960
---



clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
https://reviews.apache.org/r/29647/#comment117874

Pending discussion.



core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
https://reviews.apache.org/r/29647/#comment117872

Can we just do the assert here instead of throwing an exception?

i.e., `assertEquals(responseStatus.values.size, 
responseStatus.values.count(_.error == INVALID_REQUIRED_ACKS))`

Either way is fine. Whichever is clearer although the above may be more 
concise if it works.



core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
https://reviews.apache.org/r/29647/#comment117873

Do we need this here?


- Joel Koshy


On Feb. 11, 2015, 1:06 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29647/
 ---
 
 (Updated Feb. 11, 2015, 1:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1697
 https://issues.apache.org/jira/browse/KAFKA-1697
 
 
 Repository: kafka
 
 
 Description
 ---
 
 added early handling of invalid number of acks to handler and a test
 
 
 merging with current trunk
 
 
 moved check for legal requiredAcks to append and fixed the tests accordingly
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/InvalidRequiredAcksException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
  a6107b818947d6d6818c85cdffcb2b13f69a55c0 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 a8deac4ce5149129d0a6f44c0526af9d55649a36 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e6ad8be5e33b6fb31c078ad78f8de709869ddc04 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 6ee7d8819a9ef923f3a65c865a0a3d8ded8845f0 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 fb948b9ab28c516e81dab14dcbe211dcd99842b6 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 faa907131ed0aa94a7eacb78c1ffb576062be87a 
 
 Diff: https://reviews.apache.org/r/29647/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




  1   2   >