[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548436#comment-13548436
 ] 

Maxime Brugidou commented on KAFKA-690:
---

this would resolve KAFKA-653

 TopicMetadataRequest throws exception when no topics are specified
 --

 Key: KAFKA-690
 URL: https://issues.apache.org/jira/browse/KAFKA-690
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: David Arthur
 Fix For: 0.8

 Attachments: KAFKA-690.patch


 If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
 exception when trying to get the the head of the topic list for a debug 
 statement.
 java.util.NoSuchElementException: head of empty list
   at scala.collection.immutable.Nil$.head(List.scala:386)
   at scala.collection.immutable.Nil$.head(List.scala:383)
   at 
 kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
   at 
 kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
   at kafka.utils.Logging$class.debug(Logging.scala:51)
   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
   at 
 kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
   at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47)
   at kafka.network.Processor.read(SocketServer.scala:320)
   at kafka.network.Processor.run(SocketServer.scala:231)
   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-634) ConsoleProducer compresses messages and ignores the --compress flag

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548441#comment-13548441
 ] 

Maxime Brugidou commented on KAFKA-634:
---

KAFKA-506 fixed this (commit f64fd3dcbaace1dba7bbd72398bb3e7d28b41d61 in the 
0.8 branch)

This will be fixed in 0.8 I guess

 ConsoleProducer compresses messages and ignores the --compress flag
 ---

 Key: KAFKA-634
 URL: https://issues.apache.org/jira/browse/KAFKA-634
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7
Reporter: Anentropic
  Labels: console, producer

 I am using the kafka-producer-shell.sh script without the --compress option
 however my messages seem to be gzipped
 the docs say compression is off by default:
 http://incubator.apache.org/kafka/configuration.html
 The only producer.properties file I can find is at:
 /home/ubuntu/kafka-0.7.2-incubating-src/config/producer.properties
 In there is:
 compression.codec=0
 My process looks like:
 root  1748  1746  0 Nov19 ?00:02:37 java -Xmx512M -server 
 -Dlog4j.configuration=file:/usr/local/bin/kafka/../config/log4j.properties 
 -Dcom.sun.management.jmxremote 
 -Dcom.sun.management.jmxremote.authenticate=false 
 -Dcom.sun.management.jmxremote.ssl=false -cp 
 :/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-compiler.jar:/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-library.jar:/usr/local/bin/kafka/../core/target/scala_2.8.0/kafka-0.7.2.jar:/usr/local/bin/kafka/../core/lib/*.jar:/usr/local/bin/kafka/../perf/target/scala_2.8.0/kafka-perf-0.7.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar
  kafka.producer.ConsoleProducer --topic logtail --zookeeper x.x.x.x:2181
 But the messages come out gobbledegook unless I use a client that understands 
 compressed messages, and in that client it identifies the bit as set to 1, 
 gzip compression.
 Jun Rao jun...@gmail.com via incubator.apache.org 
 Nov 26 (1 day ago)
 to kafka-users 
 This seems to be a bug in ConsoleProducer. It also compresses messages and
 ignores the --compress flag. Could you file a jira?
 Thanks,
 Jun

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka 0.8 producer throughput

2013-01-09 Thread S Ahmed
What's the ack for?  If it fails, it will try another broker?  Can this be
disabled or it's a major design change?


On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao jun...@gmail.com wrote:

 The 50MB/s number is for 0.7. We haven't carefully measured the performance
 in 0.8 yet. We do expect the throughput that a single producer can drive in
 0.8 to be less. This is because the 0.8 producer needs to wait for an RPC
 response from the broker while in 0.7, there is no ack for the producer.
 Nevertheless, 2MB/s seems low. Could you try increasing flush interval to
 sth bigger, like 2?

 Thanks,

 Jun

 On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) 
 ju...@cisco.com wrote:

  According to Kafka official document, the producer throughput is about
  50MB/S. But I do some test, the producer throughout is only about 2MB/S.
  The test environment is the same with document says. One producer, One
  broker, One Zookeeper are in independent machine. Message size is 100
  bytes, batch size is 200, flush interval is 600 messages. The test
  environment is the same, the configuration is the same. The why there is
  such big gap the my test result and the document says?
 



[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548613#comment-13548613
 ] 

Jay Kreps commented on KAFKA-689:
-

This is pretty hacky though, no? fetching metadata should not create 
topics--that is like a getter subtly changing values underneith you. I think 
this is more evidence for needing to expose a proper create_topic api.

 Can't append to a topic/partition that does not already exist
 -

 Key: KAFKA-689
 URL: https://issues.apache.org/jira/browse/KAFKA-689
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8
Reporter: David Arthur
 Attachments: kafka.log, produce-payload.bin


 With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
 ProduceRequest for a new topic, Kafka responds with 
 kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
 doesn't exist on 0. This is when sending a ProduceRequest over the network 
 (from Python, in this case).
 If I use the console producer it works fine (topic and partition get 
 created). If I then send the same payload from before over the network, it 
 works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: Kafka 0.8 without replication - handle broker failure/availability

2013-01-09 Thread Jay Kreps
This is a good point. We have discussed this a little bit before. The key
constraint is that with replication factor 1 you can choose one of the
following: (1) high availability, (2) correct semantic partitioning. That
is to say, if a particular partition is unavailable you have no choice but
to give up and throw an error or else send the message elsewhere.

Obviously replication fixes this by just making the partitions highly
available.

It isn't really correct for us to choose one of these for the user. If they
are depending on partitioning, silently sending data elsewhere may be worse
then giving an error. So the user needs to somehow specify which behavior
they want.

Here is a JIRA where we can work out the details. I suspect this is a
blocker for 0.8:
https://issues.apache.org/jira/browse/KAFKA-691

As a work around in the meantime you can probably run with
replication--although it sounds like you don't really need it, it shouldn't
hurt.

-Jay


On Wed, Jan 9, 2013 at 2:38 AM, Maxime Brugidou
maxime.brugi...@gmail.comwrote:

 Hello, I am currently testing the 0.8 branch (and it works quite well). We
 plan to not use the replication feature for now since we don't really need
 it, we can afford to lose data in case of unrecoverable failure from a
 broker.

 However, we really don't want to have producers/consumers fail if a broker
 is down. The ideal scenario (that was working on 0.7) is that producers
 would just produce to available partitions and consumers would consume from
 available partitions. If the broker comes back online, the consumer will
 catch up, if not we can decide to throw away the data.

 Is this feasible from 0.8? right now if i kill a broker it just makes
 everything fail...

 Multiple issues will come up:
 - Since now the partitions are set globally and never change, the
 availability of a topic vary depending on where the partitions are located
 - We would need tools to make sure topics are spread enough and rebalance
 them accordingly, (using the DDL i heard about, i'm not sure yet about
 how it works, i tried editing the json strings in zk, it somehow works, and
 there's the reassignment admin command too)

 That looks rather complicated, or maybe I'm missing something? The model
 that was used in 0.7 looked much easier to operate (it had drawbacks, and
 couldn't do intra-cluster replication, but at least the availability of the
 cluster was much higher).

 Thanks in advance for any help/clues,

 Maxime



[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548646#comment-13548646
 ] 

Jay Kreps commented on KAFKA-689:
-

Well I guess what I am saying is that getting metadata is not intuitively at 
all related to creating topics. I had noticed this code before but hadn't 
really thought about it. I assume the reason for this is because to make a 
correct produce request you have to know the host so the old strategy of doing 
auto-create on produce doesn't work in 0.8.

I think there are two sensible strategies for auto-create:
1. Auto create on produce. This is tricky because you have to somehow ensure 
that the local node would hold the partitions used (and how did the client come 
up with those partitions anyway?)
2. Add a public api for creating topics and make the client implement auto 
create client-side

I would favor (2).

There is no harm in the current scheme as long as people are warned that we 
intend to change it.

 Can't append to a topic/partition that does not already exist
 -

 Key: KAFKA-689
 URL: https://issues.apache.org/jira/browse/KAFKA-689
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8
Reporter: David Arthur
 Attachments: kafka.log, produce-payload.bin


 With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
 ProduceRequest for a new topic, Kafka responds with 
 kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
 doesn't exist on 0. This is when sending a ProduceRequest over the network 
 (from Python, in this case).
 If I use the console producer it works fine (topic and partition get 
 created). If I then send the same payload from before over the network, it 
 works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka 0.8 producer throughput

2013-01-09 Thread Jay Kreps
We haven't done a ton of performance work on 0.8 yet.

Regardless, requiring the ack will certainly reduce per-producer
throughput, but it is too early to say by how much. Obviously this won't
impact broker throughput (so if you have many producers you may not notice).

The plan to fix this is just to make the produce request non-blocking. This
will allow the same kind of throughput we had before but still allow us to
give you back and error response if you want it. The hope would be to make
this change in 0.9

-Jay


On Wed, Jan 9, 2013 at 8:24 AM, Jun Rao jun...@gmail.com wrote:

 In 0.8, ack is always required. The ack returns an errorcode that indicates
 the reason if a produce request  fails (e.g., the request is sent to a
 broker that's not a leader). It also returns the offset of the produced
 messages. However, the producer can choose when to receive the acks (e.g.,
 when data reaches 1 replica or all replicas). If the ack indicates an
 error, the client can choose to retry. The retry logic is built into our
 high level producer.

 Thanks,

 Jun

 On Wed, Jan 9, 2013 at 6:20 AM, S Ahmed sahmed1...@gmail.com wrote:

  What's the ack for?  If it fails, it will try another broker?  Can this
 be
  disabled or it's a major design change?
 
 
  On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao jun...@gmail.com wrote:
 
   The 50MB/s number is for 0.7. We haven't carefully measured the
  performance
   in 0.8 yet. We do expect the throughput that a single producer can
 drive
  in
   0.8 to be less. This is because the 0.8 producer needs to wait for an
 RPC
   response from the broker while in 0.7, there is no ack for the
 producer.
   Nevertheless, 2MB/s seems low. Could you try increasing flush interval
 to
   sth bigger, like 2?
  
   Thanks,
  
   Jun
  
   On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) 
   ju...@cisco.com wrote:
  
According to Kafka official document, the producer throughput is
 about
50MB/S. But I do some test, the producer throughout is only about
  2MB/S.
The test environment is the same with document says. One producer,
 One
broker, One Zookeeper are in independent machine. Message size is 100
bytes, batch size is 200, flush interval is 600 messages. The test
environment is the same, the configuration is the same. The why there
  is
such big gap the my test result and the document says?
   
  
 



[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548667#comment-13548667
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I think the work-around is not really acceptable for me since it will consume 
3x the resources (because replication of 3 is the minimum acceptable) and it 
will still make the cluster less available anyway (unless i have only 3 
brokers).

The thing is that 0.7 was making the cluster 100% available (for my use case, 
accepting data loss) as long a single broker was alive.

A way to handle this would be to:
1. Have a lot of partitions per topic (more than the # of brokers)
2. Have something that rebalances the partitions and make sure a broker has a 
at least a partition for each topic (to make every topic available)
3. Have a setting in the consumer/producer that say I don't care about 
partitioning, just produce/consume wherever you can

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified

2013-01-09 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-690.
-

Resolution: Fixed
  Assignee: David Arthur

 TopicMetadataRequest throws exception when no topics are specified
 --

 Key: KAFKA-690
 URL: https://issues.apache.org/jira/browse/KAFKA-690
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 0.8

 Attachments: KAFKA-690.patch


 If no topics are sent in a TopicMetadataRequest, `readFrom` throws an 
 exception when trying to get the the head of the topic list for a debug 
 statement.
 java.util.NoSuchElementException: head of empty list
   at scala.collection.immutable.Nil$.head(List.scala:386)
   at scala.collection.immutable.Nil$.head(List.scala:383)
   at 
 kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
   at 
 kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43)
   at kafka.utils.Logging$class.debug(Logging.scala:51)
   at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25)
   at 
 kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43)
   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
   at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37)
   at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47)
   at kafka.network.Processor.read(SocketServer.scala:320)
   at kafka.network.Processor.run(SocketServer.scala:231)
   at java.lang.Thread.run(Thread.java:680)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-662) Create testcases for unclean shut down

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548689#comment-13548689
 ] 

John Fung commented on KAFKA-662:
-

2. testcase_9072 - This testcase is to test the basic behavior of unclean 
shutdown where log truncation takes place.

B1   B2
===  ===

a. Both brokers are up running

b. Send 5 messages

m0m0
m1m1
m2m2
m3m3
m4m4

c. Shut down B2

d. Send 10 messages

m5
m6
m7
m8
m9
m10
m11
m12
m13
m14

e. Shut down B1 (B1  B2 are both down)

f. Sleep 5 seconds

g. Start B2 (B1 is still down)

h. Send 5 messages

   m15
   m16
   m17
   m18
   m19

i. Start B1 (both B1  B2 are up running)

Recovering ...

j. Since there are discrepancy in no. of messages maintained by the internal 
index, there will be log truncation in B1's data log:

m0m0
m1m1
m2m2
m3m3
m4m4
m15   m15
m16   m16
m17   m17
m18   m18
m19   m19


$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_1_logs/t001-0/.log
Dumping /tmp/kafka_server_1_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1414561216 payload: 
Topic:t001:ThreadID:0:MessageID:15
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4018435027 payload: 
Topic:t001:ThreadID:0:MessageID:16
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2261284386 payload: 
Topic:t001:ThreadID:0:MessageID:17
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3219081918 payload: 
Topic:t001:ThreadID:0:MessageID:18
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3599978319 payload: 
Topic:t001:ThreadID:0:MessageID:19

$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files 
/tmp/kafka_server_2_logs/t001-0/.log
Dumping /tmp/kafka_server_2_logs/t001-0/.log
Starting offset: 0
offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2307843899 payload: 
Topic:t001:ThreadID:0:MessageID:00
offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3770868426 payload: 
Topic:t001:ThreadID:0:MessageID:01
offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1528229081 payload: 
Topic:t001:ThreadID:0:MessageID:02
offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 844818728 payload: 
Topic:t001:ThreadID:0:MessageID:03
offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4157601470 payload: 
Topic:t001:ThreadID:0:MessageID:04
offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 1414561216 payload: 
Topic:t001:ThreadID:0:MessageID:15
offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 4018435027 payload: 
Topic:t001:ThreadID:0:MessageID:16
offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 2261284386 payload: 
Topic:t001:ThreadID:0:MessageID:17
offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3219081918 payload: 
Topic:t001:ThreadID:0:MessageID:18
offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: 
NoCompressionCodec crc: 3599978319 payload: 
Topic:t001:ThreadID:0:MessageID:19



  

[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548748#comment-13548748
 ] 

Jun Rao commented on KAFKA-689:
---

Currently, auto-creation is a broker-side flag. Basically, the broker controls 
whether a topic can be created automatically or not. This is likely useful for 
admin. The getMetadata API implicitly implies auto-creation, subject to the 
server side config. This is probably a bit hacky. It does save one extra RPC. 
We can think a bit more if adding a separate create topic API is a better 
strategy.

 Can't append to a topic/partition that does not already exist
 -

 Key: KAFKA-689
 URL: https://issues.apache.org/jira/browse/KAFKA-689
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8
Reporter: David Arthur
 Attachments: kafka.log, produce-payload.bin


 With a totally fresh Kafka (empty logs dir and empty ZK), if I send a 
 ProduceRequest for a new topic, Kafka responds with 
 kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 
 doesn't exist on 0. This is when sending a ProduceRequest over the network 
 (from Python, in this case).
 If I use the console producer it works fine (topic and partition get 
 created). If I then send the same payload from before over the network, it 
 works.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-643) Refactor api definition layer

2013-01-09 Thread David Arthur (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548797#comment-13548797
 ] 

David Arthur commented on KAFKA-643:


+1 for splitting generic/specific parts of the API (this is basically what I do 
in my Python client).

+1 for specifying the protocol in a ~BNF form. This would require protocols to 
be specified as LL grammars (which they all are), which is required for 
efficient ByteBuffer packing/unpacking anyways. 

However, how would this scheme handle recursive definitions (like MessageSet)? 
I've always felt the depth of this should be limited to one, meaning a single 
Message can contain a compressed MessageSet which can only be composed of 
regular (uncompressed) Messages. In 
https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355, I have 
to endlessly recurse to ensure I've fully consumed the messages - kind of a 
pain. If the depth was limited, I could decode it non-recursively. 

+0 for not using Avro et al. I understand the performance implications of using 
one of these frameworks, but it sure does make client development easier. 
However, as long as the protocol spec is clear (and correct) implementing a 
client is not so bad.

What about the Java API? As far as I can tell, the purpose of these classes is 
to delegate to the real APIs and handle Java - Scala data type conversion. It 
seems like this should be able to be automatic/automagic. Although, I guess for 
the implicits stuff to work the Java classes must be present.

I know it's very new (Scala 10) and experimental, but macros might help in 
simplifying the APIs: http://docs.scala-lang.org/overviews/macros/overview.html.

 Refactor api definition layer
 -

 Key: KAFKA-643
 URL: https://issues.apache.org/jira/browse/KAFKA-643
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps

 The way we are defining our protocol is really a bit embarrassing. It is full 
 of ad hoc serialization code for each API. This code is very fiddly and 
 opaque and when it has errors they are hard to debug. Since it is all done 
 one-off it is also very easy for it to become inconsistent. This was 
 tolerable when there were only two apis with a few fields each, but now there 
 are a half dozen more complex apis. By my count there is now over 1000 lines 
 of code in kafka.apis.*.
 One option would be to use protocol buffers or thrift or another 
 schema-oriented code gen RPC language. However I think this is probably the 
 wrong direction for a couple reasons. One is that we want something that 
 works well with our I/O model, both network and disk, which is very 
 NIO-centric. So it should work directly with ByteBuffers. Second I feel that 
 these systems complicate the specification of the protocol. They give a 
 schema, which is a great high-level description, but the translation of that 
 to bytes is essentially whatever their code-gen engine chooses to do. These 
 things are a great way to build application services, but not great for 
 something like what we are building.
 Instead I think we should do what we have done, specify the protocol as a 
 wiki. However we should write a little helper code to make our lives easier.
 Here is my recommendation for how this code would work. We add two helper 
 classes: Schema and Record.
 You define messages formats like this:
 import Types._
 val FetchRequestProtocol = 
   Schema(ReplicaId-int32, 
MaxWaitTime-int32, 
MinBytes-int32,
Seq(TopicName-utf8,
   Seq(Partition-int32, 
  FetchOffset-int64, 
  MaxBytes-int32)))
 Note that this almost exactly matches the BNF for the fetch request: 
   
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 Once defined this schema can be used to parse messages:
   val record: Record = FetchRequestProtocol.readFrom(buffer)
 A record is just a wrapper around an array. The readFrom method parses out 
 the fields specified in the schema and populates the array. Fields in the 
 record can be accessed by name, e.g. 
   record(ReplicaId)
 For common access this is probably good enough. However since the position is 
 fixed, it is also possible to get the element by a Field object, which gets 
 rid of the hashmap lookup and goes directly to the right slot. E.g.
   val ReplicaIdField = FetchRequestProtocol(ReplicaId) // do this as a 
 global variable
   ...
   record(ReplicaIdField)
 This will be for cases where we are a bit performance conscious and don't 
 want to do umpteen hashmap lookups to resolve string field names.
 Likewise the other direction, to write out a record:
   record.writeTo(buffer)
 and to get 

[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-688:


Attachment: kafka-688-v1.patch

 System Test - Update all testcase__properties.json for properties keys 
 uniform naming convention
 

 Key: KAFKA-688
 URL: https://issues.apache.org/jira/browse/KAFKA-688
 Project: Kafka
  Issue Type: Task
Reporter: John Fung
Assignee: John Fung
  Labels: replication-testing
 Attachments: kafka-688-v1.patch


 After the changes made in uniform naming convention of properties keys 
 (KAFKA-648), all testcase__properties.json files need to be updated on 
 the following properties keys changes:
 brokerid = broker.id
 log.file.size = log.segment.size
 groupid = group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548886#comment-13548886
 ] 

John Fung commented on KAFKA-688:
-

Uploaded kafka-688-v1.patch which is required after checking in the patch in 
KAFKA-648.

The changes include:

1. Updated all testcase__properties.json on the following properties keys:
brokerid = broker.id
log.file.size = log.segment.size
groupid = group.id 

2. Added 07_client property to 
migration_tool_testsuite/testcase_/testcase__properties.json files 
under those brokers running in 0.7 code

3. Modified system_test/utils/kafka_system_test_utils.py to add brokerid key 
to server.properties file if the broker is running under 0.7

 System Test - Update all testcase__properties.json for properties keys 
 uniform naming convention
 

 Key: KAFKA-688
 URL: https://issues.apache.org/jira/browse/KAFKA-688
 Project: Kafka
  Issue Type: Task
Reporter: John Fung
Assignee: John Fung
  Labels: replication-testing
 Attachments: kafka-688-v1.patch


 After the changes made in uniform naming convention of properties keys 
 (KAFKA-648), all testcase__properties.json files need to be updated on 
 the following properties keys changes:
 brokerid = broker.id
 log.file.size = log.segment.size
 groupid = group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention

2013-01-09 Thread John Fung (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fung updated KAFKA-688:


Status: Patch Available  (was: Open)

 System Test - Update all testcase__properties.json for properties keys 
 uniform naming convention
 

 Key: KAFKA-688
 URL: https://issues.apache.org/jira/browse/KAFKA-688
 Project: Kafka
  Issue Type: Task
Reporter: John Fung
Assignee: John Fung
  Labels: replication-testing
 Attachments: kafka-688-v1.patch


 After the changes made in uniform naming convention of properties keys 
 (KAFKA-648), all testcase__properties.json files need to be updated on 
 the following properties keys changes:
 brokerid = broker.id
 log.file.size = log.segment.size
 groupid = group.id

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread Sriram Subramanian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriram Subramanian updated KAFKA-648:
-

Attachment: configchanges-v4.patch

Reverted the change from migrationtool/consumer.properties

 Use uniform convention for naming properties keys 
 --

 Key: KAFKA-648
 URL: https://issues.apache.org/jira/browse/KAFKA-648
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Sriram Subramanian
Priority: Blocker
 Fix For: 0.8, 0.8.1

 Attachments: configchanges-1.patch, configchanges-v2.patch, 
 configchanges-v3.patch, configchanges-v4.patch


 Currently, the convention that we seem to use to get a property value in 
 *Config is as follows:
 val configVal = property.getType(config.val, ...) // dot is used to 
 separate two words in the key and the first letter of second word is 
 capitalized in configVal.
 We should use similar convention for groupId, consumerId, clientId, 
 correlationId.
 This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread John Fung (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548989#comment-13548989
 ] 

John Fung commented on KAFKA-648:
-

Tested v4 patch with KAFKA-688-v1.patch with the latest patch and it works fine.

 Use uniform convention for naming properties keys 
 --

 Key: KAFKA-648
 URL: https://issues.apache.org/jira/browse/KAFKA-648
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Sriram Subramanian
Priority: Blocker
 Fix For: 0.8, 0.8.1

 Attachments: configchanges-1.patch, configchanges-v2.patch, 
 configchanges-v3.patch, configchanges-v4.patch


 Currently, the convention that we seem to use to get a property value in 
 *Config is as follows:
 val configVal = property.getType(config.val, ...) // dot is used to 
 separate two words in the key and the first letter of second word is 
 capitalized in configVal.
 We should use similar convention for groupId, consumerId, clientId, 
 correlationId.
 This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549075#comment-13549075
 ] 

Jun Rao commented on KAFKA-648:
---

Thanks for patch v4. Made another pass. Should we make the following changes?

40. KafkaConfig:
max.message.size = max.message.bytes
socket.send.buffer = socket.send.buffer.bytes
socket.receive.buffer = socket.receive.buffer.bytes
log.segment.size = log.segment.bytes
log.retention.size = log.retention.bytes
log.flush.interval = log.flush.interval.messages
log.default.flush.interval.ms = log.flush.interval.ms
log.flush.intervals.ms.per.topic = log.flush.interval.ms.per.topic
replica.socket.buffersize = replica.socket.receive.buffer.bytes
replica.fetch.size = replica.max.fetch.bytes
fetch.request.purgatory.purge.interval = 
fetch.purgatory.purge.interval.requests
producer.request.purgatory.purge.interval = 
producer.purgatory.purge.interval.requests
replica.max.lag.time.ms = max.replica.lag.time.ms
replica.max.lag.bytes = max.replica.lag.bytes
replica.fetch.max.wait.ms = max.replica.fetch.wait.ms
replica.fetch.min.bytes = min.replica.fetch.bytes

41. ProducerConfig: producer.retry.count. In other configs we have 
num.network.threads. To be consistent, shouldn't we use num.producer.retries?

42. AsyncProducerConfig:
queue.time = queue.time.ms
buffer.size = socket.send.buffer.bytes

43. ConsumerConfig:
socket.buffer.size = socket.receive.buffer.bytes
fetch.size = max.fetch.bytes


 Use uniform convention for naming properties keys 
 --

 Key: KAFKA-648
 URL: https://issues.apache.org/jira/browse/KAFKA-648
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Sriram Subramanian
Priority: Blocker
 Fix For: 0.8, 0.8.1

 Attachments: configchanges-1.patch, configchanges-v2.patch, 
 configchanges-v3.patch, configchanges-v4.patch


 Currently, the convention that we seem to use to get a property value in 
 *Config is as follows:
 val configVal = property.getType(config.val, ...) // dot is used to 
 separate two words in the key and the first letter of second word is 
 capitalized in configVal.
 We should use similar convention for groupId, consumerId, clientId, 
 correlationId.
 This change will probably be backward non-compatible.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549105#comment-13549105
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy 
already -- just send a ReassignPartition command when you add a broker)

I could probably implement this very quickly, I'm just not sure of how you get 
the availability of a partition, but i'll try to figure it out and submit a 
first patch tomorrow.

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: kafka mavenification

2013-01-09 Thread Joe Stein
ok, in progress doing that now.

On Mon, Jan 7, 2013 at 12:24 PM, Jun Rao jun...@gmail.com wrote:

 Joe,

 0.8 will be released from the 0.8 branch and trunk is for post 0.8. So, you
 will need to commit the maven changes to 0.8 and then merge them to trunk.

 Thanks,

 Jun

 On Sun, Jan 6, 2013 at 10:53 AM, Joe Stein crypt...@gmail.com wrote:

  Ok, I will commit this patch then just to trunk and we can have the 0.8
  beta mavenized.  That works!
 
  /*
  Joe Stein, Chief Architect
  http://www.medialets.com
  Twitter: @allthingshadoop
  Mobile: 917-597-9771
  */
 
  On Jan 6, 2013, at 1:31 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
   Where do we feel we are with 0.8 ?
  
   We can release a public beta by the end of the month, but we are still
  few
   weeks away from announcing a release.
  
   Thanks,
   Neha
  
   On Sat, Jan 5, 2013 at 10:50 PM, Joe Stein crypt...@gmail.com wrote:
  
   Where do we feel we are with 0.8 ?
 




-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
*/


[jira] [Commented] (KAFKA-133) Publish kafka jar to a public maven repository

2013-01-09 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549250#comment-13549250
 ] 

Joe Stein commented on KAFKA-133:
-

committed to 0.8 branch also

 Publish kafka jar to a public maven repository
 --

 Key: KAFKA-133
 URL: https://issues.apache.org/jira/browse/KAFKA-133
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.6, 0.8
Reporter: Neha Narkhede
  Labels: patch
 Fix For: 0.8

 Attachments: KAFKA-133.patch, pom.xml


 The released kafka jar must be download manually and then deploy to a private 
 repository before they can be used by a developer using maven2.
 Similar to other Apache projects, it will be nice to have a way to publish 
 Kafka releases to a public maven repo. 
 In the past, we gave it a try using sbt publish to Sonatype Nexus maven repo, 
 but ran into some authentication problems. It will be good to revisit this 
 and get it resolved.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549276#comment-13549276
 ] 

Jay Kreps commented on KAFKA-691:
-

That would be awesome. If you don't mind just give the proposed set of changes 
on the JIRA first and lets get everyone on board with how it should work since 
it is a reasonably important change (or, if you don't mind revising your patch 
we can start with that).

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: About Kafka 0.8 producer

2013-01-09 Thread 杨涛
Hi,

 Then is to say: for kafka 0.8, when you add a new broker into the
cluster, you need not to change the producer broker setting, is it right?

 

 

Bruce



[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549373#comment-13549373
 ] 

Jun Rao commented on KAFKA-691:
---

DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. 
If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not 
available. 

There is another tricky issue. If a partition is not available, when do we 
refresh the metadata to check if the partition becomes available again? 
Currently, we refresh the metadata if we fail to send the data. However, if we 
always route the messages to available partitions, we may never fail to send. 
One possible solution is that if there is at least one partition not available 
in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of 
time has passed (e.g., 10 mins).

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira