Re: Review Request 29091: Patch for KAFKA-1646

2014-12-16 Thread Qianlin Xia

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/
---

(Updated Dec. 16, 2014, 8:13 a.m.)


Review request for kafka.


Bugs: KAFKA-1646
https://issues.apache.org/jira/browse/KAFKA-1646


Repository: kafka


Description
---

truncate off trailing zeros on broker restart if broker is gracefully stopped.


Diffs
-

  core/src/main/scala/kafka/log/FileMessageSet.scala 
e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
  core/src/main/scala/kafka/log/Log.scala 
46df8d99d977a3b010a9b9f4698187fa9bfb2498 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/utils/Utils.scala 
a89b0463685e6224d263bc9177075e1bb6b93d04 

Diff: https://reviews.apache.org/r/29091/diff/


Testing
---


Thanks,

Qianlin Xia



Re: Review Request 29091: Patch for KAFKA-1646

2014-12-16 Thread Qianlin Xia

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/
---

(Updated Dec. 16, 2014, 8:18 a.m.)


Review request for kafka.


Bugs: KAFKA-1646
https://issues.apache.org/jira/browse/KAFKA-1646


Repository: kafka


Description
---

truncate off trailing zeros on broker restart if broker is gracefully stopped.


Diffs (updated)
-

  core/src/main/scala/kafka/log/FileMessageSet.scala 
e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
  core/src/main/scala/kafka/log/Log.scala 
46df8d99d977a3b010a9b9f4698187fa9bfb2498 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/utils/Utils.scala 
a89b0463685e6224d263bc9177075e1bb6b93d04 

Diff: https://reviews.apache.org/r/29091/diff/


Testing
---


Thanks,

Qianlin Xia



Re: Review Request 29091: Patch for KAFKA-1646

2014-12-16 Thread Qianlin Xia

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/
---

(Updated Dec. 16, 2014, 8:22 a.m.)


Review request for kafka.


Bugs: KAFKA-1646
https://issues.apache.org/jira/browse/KAFKA-1646


Repository: kafka


Description
---

truncate off trailing zeros on broker restart if broker is gracefully stopped.


Diffs (updated)
-

  core/src/main/scala/kafka/log/FileMessageSet.scala 
e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
  core/src/main/scala/kafka/log/Log.scala 
46df8d99d977a3b010a9b9f4698187fa9bfb2498 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/utils/Utils.scala 
a89b0463685e6224d263bc9177075e1bb6b93d04 

Diff: https://reviews.apache.org/r/29091/diff/


Testing
---


Thanks,

Qianlin Xia



Re: Review Request 29091: Patch for KAFKA-1646

2014-12-16 Thread Qianlin Xia

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/
---

(Updated Dec. 16, 2014, 8:30 a.m.)


Review request for kafka.


Bugs: KAFKA-1646
https://issues.apache.org/jira/browse/KAFKA-1646


Repository: kafka


Description
---

truncate off trailing zeros on broker restart if broker is gracefully stopped.


Diffs (updated)
-

  core/src/main/scala/kafka/log/FileMessageSet.scala 
e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
  core/src/main/scala/kafka/log/Log.scala 
46df8d99d977a3b010a9b9f4698187fa9bfb2498 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/utils/Utils.scala 
a89b0463685e6224d263bc9177075e1bb6b93d04 

Diff: https://reviews.apache.org/r/29091/diff/


Testing
---


Thanks,

Qianlin Xia



[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows

2014-12-16 Thread Qianlin Xia (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qianlin Xia updated KAFKA-1646:
---
Attachment: KAFKA-1646_20141216_163008.patch

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2014-12-16 Thread Qianlin Xia (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14247936#comment-14247936
 ] 

Qianlin Xia commented on KAFKA-1646:


Updated reviewboard https://reviews.apache.org/r/29091/diff/
 against branch origin/0.8.1

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Optimistic locking

2014-12-16 Thread Daniel Schierbeck
I'm trying to design a system that uses Kafka as its primary data store by
persisting immutable events into a topic and keeping a secondary index in
another data store. The secondary index would store the entities. Each
event would pertain to some entity, e.g. a user, and those entities are
stored in an easily queriable way.

Kafka seems well suited for this, but there's one thing I'm having problems
with. I cannot guarantee that only one process writes events about an
entity, which makes the design vulnerable to integrity issues.

For example, say that a user can have multiple email addresses assigned,
and the EmailAddressRemoved event is published when the user removes one.
There's an integrity constraint, though: every user MUST have at least one
email address. As far as I can see, there's no way to stop two separate
processes from looking up a user entity, seeing that there are two email
addresses assigned, and each publish an event. The end result would violate
the contraint.

If I'm wrong in saying that this isn't possible I'd love some feedback!

My current thinking is that Kafka could relatively easily support this kind
of application with a small additional API. Kafka already has the abstract
notion of entities through its key-based retention policy. If the produce
API was modified in order to allow an integer OffsetConstraint, the
following algorithm could determine whether the request should proceed:

1. For every key seen, keep track of the offset of the latest message
referencing the key.
2. When an OffsetContraint is specified in the produce API call, compare
that value with the latest offset for the message key.
2.1. If they're identical, allow the operation to continue.
2.2. If they're not identical, fail with some OptimisticLockingFailure.

Would such a feature be completely out of scope for Kafka?

Best regards,
Daniel Schierbeck


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-16 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248407#comment-14248407
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

 Mirror Maker could lose data on unclean shutdown.
 -

 Key: KAFKA-1650
 URL: https://issues.apache.org/jira/browse/KAFKA-1650
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
 KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
 KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
 KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
 KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, 
 KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch


 Currently if mirror maker got shutdown uncleanly, the data in the data 
 channel and buffer could potentially be lost. With the new producer's 
 callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-16 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 16, 2014, 4:03 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Addressed Jun's comments.


Incorporated Jun's comments


Incorporated Jun's comments and rebased on trunk


Rebased on current trunk


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsumerConnector.scala 
62c0686e816d2888772d5a911becf625eedee397 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
e991d2187d03241f639eeaf6769fb59c8c99664c 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9baad34a9793e5067d11289ece2154ba87b388af 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
77d951d13b8d8ad00af40257fe51623cc2caa61a 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1499) Broker-side compression configuration

2014-12-16 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-1499:
---
Attachment: KAFKA-1499_2014-12-16_22:39:10.patch

 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, 
 KAFKA-1499_2014-12-16_22:39:10.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24704: Patch for KAFKA-1499

2014-12-16 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24704/
---

(Updated Dec. 16, 2014, 5:10 p.m.)


Review request for kafka.


Bugs: KAFKA-1499
https://issues.apache.org/jira/browse/KAFKA-1499


Repository: kafka


Description (updated)
---

Support given for Broker-side compression


Diffs (updated)
-

  core/src/main/scala/kafka/log/Log.scala 
4fae2f0d339b256832baa62ca4995d10546716b4 
  core/src/main/scala/kafka/log/LogConfig.scala 
ca7a99e99f641b2694848b88bf4ae94657071d03 
  core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
788c7864bc881b935975ab4a4e877b690e65f1f1 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6e26c5436feb4629d17f199011f3ebb674aa767f 
  core/src/main/scala/kafka/server/KafkaServer.scala 
1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
  core/src/test/scala/kafka/log/LogConfigTest.scala 
99b0df7b69c5e0a1b6251c54592c6ef63b1800fe 
  core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
4e45d965bc423192ac704883ee75e9727006f89b 
  core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
2377abe4933e065d037828a214c3a87e1773a8ef 

Diff: https://reviews.apache.org/r/24704/diff/


Testing
---


Thanks,

Manikumar Reddy O



[jira] [Commented] (KAFKA-1499) Broker-side compression configuration

2014-12-16 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248520#comment-14248520
 ] 

Manikumar Reddy commented on KAFKA-1499:


[~jjkoshy]  can you review this patch?

 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, 
 KAFKA-1499_2014-12-16_22:39:10.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


API Annotations

2014-12-16 Thread Gwen Shapira
Hi,

Kafka has public APIs in Java and Scala, intended for use by external
developers.
In addition, Kafka also exposes many public methods that are intended
to use within Kafka but are not intended to be called by external
developers.
Also, some of the external APIs are less stable than others (the new
producer for example).

In Hadoop we have a similar situation, and to avoid misunderstandings
or miscommunications on which APIs are external and which are stable,
we use annotations to communicate this information.
We find it very useful in preventing our customers from accidentally
getting into trouble by using internal methods or unstable APIs.

Here are the annotations Hadoop uses:
https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html
https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html

I'm wondering what others think about using something similar in Kafka.

Gwen


Re: Review Request 29091: Patch for KAFKA-1646

2014-12-16 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/#review65216
---

Ship it!


Overall it looks good to me.

- Sriharsha Chintalapani


On Dec. 16, 2014, 8:30 a.m., Qianlin Xia wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29091/
 ---
 
 (Updated Dec. 16, 2014, 8:30 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1646
 https://issues.apache.org/jira/browse/KAFKA-1646
 
 
 Repository: kafka
 
 
 Description
 ---
 
 truncate off trailing zeros on broker restart if broker is gracefully stopped.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/FileMessageSet.scala 
 e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
   core/src/main/scala/kafka/log/Log.scala 
 46df8d99d977a3b010a9b9f4698187fa9bfb2498 
   core/src/main/scala/kafka/log/LogSegment.scala 
 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
   core/src/main/scala/kafka/utils/Utils.scala 
 a89b0463685e6224d263bc9177075e1bb6b93d04 
 
 Diff: https://reviews.apache.org/r/29091/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Qianlin Xia
 




[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2014-12-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248597#comment-14248597
 ] 

Sriharsha Chintalapani commented on KAFKA-1646:
---

Thanks [~qixia] for the reviewboard patch. [~jkreps] [~junrao] could you please 
give your feedback on updated patch. Thanks.

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
 KAFKA-1646_20141216_163008.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-12-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248599#comment-14248599
 ] 

Sriharsha Chintalapani commented on KAFKA-1070:
---

[~nehanarkhede] Can you please take a look at the patch and also reply to your 
comments. Thanks.

 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.3

 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
 KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
 KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, 
 KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch


 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248602#comment-14248602
 ] 

Gwen Shapira commented on KAFKA-1819:
-

Thanks for pointing the similar issue [~jjkoshy].

The log is indeed removed from the pool in LogManager.deleteLog, and we could 
remove them in doneCleaning. 

However, I think we want to be able to force cleaning as part of the topic 
delete.
If we don't do it, the checkpoint file will only get updated some time later 
when doneCleaning is called. This can be more challenging to troubleshoot and 
also may not happen before Gian creates a new topic with same name.

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: API Annotations

2014-12-16 Thread Joe Stein
This is definitely an area we can improve upon, thanks for bringing it up
Gwen. We should also decide on structure for how we publish too that has
not been something I have been consistent about for each release that
should be standard.

We can publish to SVN but I always feel like drift happens there. In
0.8.2-beta I pushed them so they are in dist now
https://archive.apache.org/dist/kafka/0.8.2-beta/ as java-doc and scala-doc
maybe that is ok for final?

The content should be as helpful as it can be to folks, for sure.

Gwen, do you want to create a JIRA and work on a patch for this? I feel
like it is something important for 0.8.2 that wouldn't impact code (and if
it does it is for good reason to best stabilize) not sure what other folks
think of that though? 0.8.3 for sure +1

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Tue, Dec 16, 2014 at 1:04 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 Kafka has public APIs in Java and Scala, intended for use by external
 developers.
 In addition, Kafka also exposes many public methods that are intended
 to use within Kafka but are not intended to be called by external
 developers.
 Also, some of the external APIs are less stable than others (the new
 producer for example).

 In Hadoop we have a similar situation, and to avoid misunderstandings
 or miscommunications on which APIs are external and which are stable,
 we use annotations to communicate this information.
 We find it very useful in preventing our customers from accidentally
 getting into trouble by using internal methods or unstable APIs.

 Here are the annotations Hadoop uses:

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html

 I'm wondering what others think about using something similar in Kafka.

 Gwen



Re: Review Request 29030: KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start

2014-12-16 Thread Ashish Singh

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29030/
---

(Updated Dec. 16, 2014, 6:27 p.m.)


Review request for kafka.


Changes
---

Have exception print whole path for dir causing issue, rather than just name of 
dir.


Bugs: KAFKA-742
https://issues.apache.org/jira/browse/KAFKA-742


Repository: kafka


Description
---

KAFKA-742: Existing directories under the Kafka data directory without any data 
cause process to not start


Diffs (updated)
-

  core/src/main/scala/kafka/log/Log.scala 
4fae2f0d339b256832baa62ca4995d10546716b4 
  core/src/main/scala/kafka/log/LogManager.scala 
4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
d670ba76acd54e3e88855c56c152c7cc36dddfdc 

Diff: https://reviews.apache.org/r/29030/diff/


Testing
---


Thanks,

Ashish Singh



[jira] [Updated] (KAFKA-742) Existing directories under the Kafka data directory without any data cause process to not start

2014-12-16 Thread Ashish Kumar Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Kumar Singh updated KAFKA-742:
-
Attachment: KAFKA-742.1.patch

Makes sense. Updated patch.

 Existing directories under the Kafka data directory without any data cause 
 process to not start
 ---

 Key: KAFKA-742
 URL: https://issues.apache.org/jira/browse/KAFKA-742
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 0.8.0
Reporter: Chris Curtin
Assignee: Ashish Kumar Singh
 Fix For: 0.8.3

 Attachments: KAFKA-742.1.patch, KAFKA-742.patch


 I incorrectly setup the configuration file to have the metrics go to 
 /var/kafka/metrics while the logs were in /var/kafka. On startup I received 
 the following error then the daemon exited:
 30   [main] INFO  kafka.log.LogManager  - [Log Manager on Broker 0] Loading 
 log 'metrics'
 32   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error during 
 KafkaServerStable startup. Prepare to shutdown
 java.lang.StringIndexOutOfBoundsException: String index out of range: -1
 at java.lang.String.substring(String.java:1937)
 at 
 kafka.log.LogManager.kafka$log$LogManager$$parseTopicPartitionName(LogManager.scala:335)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:112)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
 at kafka.log.LogManager.loadLogs(LogManager.scala:101)
 at kafka.log.LogManager.init(LogManager.scala:62)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
 at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
 34   [main] INFO  kafka.server.KafkaServer  - [Kafka Server 0], shutting down
 This was on a brand new cluster so no data or metrics logs existed yet.
 Moving the metrics to their own directory (not a child of the logs) allowed 
 the daemon to start.
 Took a few minutes to figure out what was wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 24704: Patch for KAFKA-1499

2014-12-16 Thread Jonathan Creasy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24704/#review65221
---



core/src/main/scala/kafka/log/Log.scala
https://reviews.apache.org/r/24704/#comment108254

I wouldn't block on this since there are lots of other cases of this that 
aren't new but if you happen to do another patch remove this whitespace error.


- Jonathan Creasy


On Dec. 16, 2014, 5:10 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24704/
 ---
 
 (Updated Dec. 16, 2014, 5:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1499
 https://issues.apache.org/jira/browse/KAFKA-1499
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Support given for Broker-side compression
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/Log.scala 
 4fae2f0d339b256832baa62ca4995d10546716b4 
   core/src/main/scala/kafka/log/LogConfig.scala 
 ca7a99e99f641b2694848b88bf4ae94657071d03 
   core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 788c7864bc881b935975ab4a4e877b690e65f1f1 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/test/scala/kafka/log/LogConfigTest.scala 
 99b0df7b69c5e0a1b6251c54592c6ef63b1800fe 
   core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
 4e45d965bc423192ac704883ee75e9727006f89b 
   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
 2377abe4933e065d037828a214c3a87e1773a8ef 
 
 Diff: https://reviews.apache.org/r/24704/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




Re: Review Request 24704: Patch for KAFKA-1499

2014-12-16 Thread Jonathan Creasy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24704/#review65223
---

Ship it!


Ship It!

- Jonathan Creasy


On Dec. 16, 2014, 5:10 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24704/
 ---
 
 (Updated Dec. 16, 2014, 5:10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1499
 https://issues.apache.org/jira/browse/KAFKA-1499
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Support given for Broker-side compression
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/Log.scala 
 4fae2f0d339b256832baa62ca4995d10546716b4 
   core/src/main/scala/kafka/log/LogConfig.scala 
 ca7a99e99f641b2694848b88bf4ae94657071d03 
   core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 788c7864bc881b935975ab4a4e877b690e65f1f1 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/test/scala/kafka/log/LogConfigTest.scala 
 99b0df7b69c5e0a1b6251c54592c6ef63b1800fe 
   core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 
 4e45d965bc423192ac704883ee75e9727006f89b 
   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
 2377abe4933e065d037828a214c3a87e1773a8ef 
 
 Diff: https://reviews.apache.org/r/24704/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-16 Thread Jun Rao
Joel,

With a byte array interface, of course there is nothing that one can't do.
However, the real question is that whether we want to encourage people to
use it this way or not. Being able to flow just bytes is definitely easier
to get started. That's why many early adopters choose to do it that way.
However, it's often the case that they start feeling the pain later when
some producers change the data format. Their Hive/Pig queries start to
break and it's a painful process to have the issue fixed. So, the purpose
of this api change is really to encourage people to standardize on a single
serializer/deserializer that supports things like data validation and
schema evolution upstream in the producer. Now, suppose there is an Avro
serializer/deserializer implementation. How do we make it easy for people
to adopt? If the serializer is part of the api, we can just say, wire in
the Avro serializer for key and/or value in the config and then you can
start sending Avro records to the producer. If the serializer is not part
of the api, we have to say, first instantiate a key and/or value serializer
this way, send the key to the key serializer to get the key bytes, send the
value to the value serializer to get the value bytes, and finally send the
bytes to the producer. The former will be simpler and likely makes the
adoption easier.

Thanks,

Jun

On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Documentation is inevitable even if the serializer/deserializer is
 part of the API - since the user has to set it up in the configs. So
 again, you can only encourage people to use it through documentation.
 The simpler byte-oriented API seems clearer to me because anyone who
 needs to send (or receive) a specific data type will _be forced to_
 (or actually, _intuitively_) select a serializer (or deserializer) and
 will definitely pick an already available implementation if a good one
 already exists.

 Sorry I still don't get it and this is really the only sticking point
 for me, albeit a minor one (which is why I have been +0 all along on
 the change). I (and I think many others) would appreciate it if
 someone can help me understand this better.  So I will repeat the
 question: What usage pattern cannot be supported by easily by the
 simpler API without adding burden on the user?

 Thanks,

 Joel

 On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote:
  Joel,
 
  It's just that if the serializer/deserializer is not part of the API, you
  can only encourage people to use it through documentation. However, not
  everyone will read the documentation if it's not directly used in the
 API.
 
  Thanks,
 
  Jun
 
  On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
   (sorry about the late follow-up late - I'm traveling most of this
   month)
  
   I'm likely missing something obvious, but I find the following to be a
   somewhat vague point that has been mentioned more than once in this
   thread without a clear explanation. i.e., why is it hard to share a
   serializer/deserializer implementation and just have the clients call
   it before a send/receive? What usage pattern cannot be supported by
   the simpler API?
  
1. Can we keep the serialization semantics outside the Producer
 interface
and have simple bytes in / bytes out for the interface (This is what
 we
have today).
   
The points for this is to keep the interface simple and usage easy to
understand. The points against this is that it gets hard to share
 common
usage patterns around serialization/message validations for the
 future.
  
  
   On Tue, Dec 09, 2014 at 03:51:08AM +, Sriram Subramanian wrote:
Thank you Jay. I agree with the issue that you point w.r.t paired
serializers. I also think having mix serialization types is rare. To
 get
the current behavior, one can simply use a ByteArraySerializer. This
 is
best understood by talking with many customers and you seem to have
 done
that. I am convinced about the change.
   
For the rest who gave -1 or 0 for this proposal, does the answers
 for the
three points(updated) below seem reasonable? Are these explanations
convincing?
   
   
1. Can we keep the serialization semantics outside the Producer
 interface
and have simple bytes in / bytes out for the interface (This is what
 we
have today).
   
The points for this is to keep the interface simple and usage easy to
understand. The points against this is that it gets hard to share
 common
usage patterns around serialization/message validations for the
 future.
   
2. Can we create a wrapper producer that does the serialization and
 have
different variants of it for different data formats?
   
The points for this is again to keep the main API clean. The points
against this is that it duplicates the API, increases the surface
 area
   and
creates redundancy for a minor addition.
   
3. Do we need to 

Re: API Annotations

2014-12-16 Thread Jay Kreps
Hey Gwen,

We discussed this a bit about this when starting on the new clients.

We were super sloppy about this in initial Kafka development--single jar,
no real differentiation between public and private apis.

The plan was something like the following:
1. Start to consider this with the new clients.
2. Do the public/private designation at the package level. The public
packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer,
o.a.k.tools. This makes javadoc and things like that easier, and it makes
it easy to see at a glance all the public classes. It would be even better
to enforce this in the build if that is possible (i.e. no class from a
non-public package is leaked) but we haven't done this. This approach
obviously wasn't possible in Hadoop since they started without a clear
delineation as we did in the original scala code.

Thoughts?

-Jay


On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Hi,

 Kafka has public APIs in Java and Scala, intended for use by external
 developers.
 In addition, Kafka also exposes many public methods that are intended
 to use within Kafka but are not intended to be called by external
 developers.
 Also, some of the external APIs are less stable than others (the new
 producer for example).

 In Hadoop we have a similar situation, and to avoid misunderstandings
 or miscommunications on which APIs are external and which are stable,
 we use annotations to communicate this information.
 We find it very useful in preventing our customers from accidentally
 getting into trouble by using internal methods or unstable APIs.

 Here are the annotations Hadoop uses:

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html

 https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html

 I'm wondering what others think about using something similar in Kafka.

 Gwen



[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-16 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248764#comment-14248764
 ] 

Evan Huus commented on KAFKA-1806:
--

Sarama client maintainer here (via 
https://github.com/Shopify/sarama/issues/226); this looks like a kafka bug to 
me since the error in the log message is from a ReplicaFetcherThread, but I'm 
happy to provide extra information on the behaviour of the client if you think 
it's relevant.

 broker can still expose uncommitted data to a consumer
 --

 Key: KAFKA-1806
 URL: https://issues.apache.org/jira/browse/KAFKA-1806
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: lokesh Birla
Assignee: Neha Narkhede

 Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
 is marked fixed but I still see this issue in 0.8.1.1. I am able to 
 reproducer the issue consistently. 
 [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
 request for partition [mmetopic4,2] offset 1940029 from consumer with 
 correlation id 21 (kafka.server.Kaf
 kaApis)
 java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
 (1818353) less than the start offset (1940029).
 at kafka.log.LogSegment.read(LogSegment.scala:136)
 at kafka.log.Log.read(Log.scala:386)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
 at 
 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.immutable.Map$Map1.map(Map.scala:107)
 at 
 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
 at 
 kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
 at 
 kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-16 Thread lokesh Birla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248993#comment-14248993
 ] 

lokesh Birla commented on KAFKA-1806:
-

This problem occurs multiple times in server.log. 
Currently I am using:

 #added replica fetchers
num.replica.fetchers=4


[2014-08-30 04:00:58,419] ERROR [ReplicaFetcherThread-1-2], Current offset 
7343326909 for partition [mmetopic1,0] out of range; reset offset to 7351079341 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:01:58,351] ERROR [ReplicaFetcherThread-1-2], Current offset 
7352830699 for partition [mmetopic1,0] out of range; reset offset to 7360600212 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:01:58,398] ERROR [ReplicaFetcherThread-2-2], Current offset 
7362122784 for partition [mmetopic1,1] out of range; reset offset to 7369788902 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:01:58,428] ERROR [ReplicaFetcherThread-3-2], Current offset 
7349217662 for partition [mmetopic1,2] out of range; reset offset to 7356979468 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:02:58,380] ERROR [ReplicaFetcherThread-3-2], Current offset 
7358748697 for partition [mmetopic1,2] out of range; reset offset to 7366511359 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:02:58,431] ERROR [ReplicaFetcherThread-2-2], Current offset 
7371546217 for partition [mmetopic1,1] out of range; reset offset to 7379322019 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:02:58,491] ERROR [ReplicaFetcherThread-1-2], Current offset 
7362381355 for partition [mmetopic1,0] out of range; reset offset to 7370131818 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:03:58,553] ERROR [ReplicaFetcherThread-3-2], Current offset 
7368280588 for partition [mmetopic1,2] out of range; reset offset to 7376042337 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:03:58,606] ERROR [ReplicaFetcherThread-1-2], Current offset 
7371895090 for partition [mmetopic1,0] out of range; reset offset to 7379659373 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:03:58,745] ERROR [ReplicaFetcherThread-2-2], Current offset 
7381073377 for partition [mmetopic1,1] out of range; reset offset to 7388856060 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:04:58,377] ERROR [ReplicaFetcherThread-2-2], Current offset 
7390601461 for partition [mmetopic1,1] out of range; reset offset to 7398383811 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:04:58,378] ERROR [ReplicaFetcherThread-1-2], Current offset 
7381410731 for partition [mmetopic1,0] out of range; reset offset to 7389193402 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:04:58,462] ERROR [ReplicaFetcherThread-3-2], Current offset 
7377936663 for partition [mmetopic1,2] out of range; reset offset to 7385573885 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:05:58,440] ERROR [ReplicaFetcherThread-2-2], Current offset 
7400170911 for partition [mmetopic1,1] out of range; reset offset to 7407915357 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:05:58,441] ERROR [ReplicaFetcherThread-1-2], Current offset 
7390968588 for partition [mmetopic1,0] out of range; reset offset to 7398725995 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:05:58,442] ERROR [ReplicaFetcherThread-3-2], Current offset 
7387325243 for partition [mmetopic1,2] out of range; reset offset to 7395096361 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:06:58,326] ERROR [ReplicaFetcherThread-1-2], Current offset 
7400572665 for partition [mmetopic1,0] out of range; reset offset to 7411422730 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:06:58,346] ERROR [ReplicaFetcherThread-2-2], Current offset 
7409827554 for partition [mmetopic1,1] out of range; reset offset to 7417436416 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:06:58,511] ERROR [ReplicaFetcherThread-3-2], Current offset 
7396889418 for partition [mmetopic1,2] out of range; reset offset to 7404620618 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:07:58,328] ERROR [ReplicaFetcherThread-2-2], Current offset 
7419467753 for partition [mmetopic1,1] out of range; reset offset to 7420615385 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:07:58,362] ERROR [ReplicaFetcherThread-3-2], Current offset 
7406461331 for partition [mmetopic1,2] out of range; reset offset to 7410977640 
(kafka.server.ReplicaFetcherThread)
[2014-08-30 04:07:58,588] ERROR [ReplicaFetcherThread-1-2], Current offset 
7413376626 for partition [mmetopic1,0] out of range; reset offset to 7414599975 
(kafka.server.ReplicaFetcherThread)



 broker can still expose uncommitted data to a consumer
 --

 Key: KAFKA-1806
 URL: https://issues.apache.org/jira/browse/KAFKA-1806
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: lokesh Birla

[jira] [Commented] (KAFKA-1479) Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics

2014-12-16 Thread Arup Malakar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249071#comment-14249071
 ] 

Arup Malakar commented on KAFKA-1479:
-

For people who may stumble upon this JIRA, the steps mentioned by [~manasi] in 
https://issues.apache.org/jira/browse/KAFKA-1479?focusedCommentId=14017044page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14017044
 worked for me as well.

 Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition 
 info for deleted topics
 

 Key: KAFKA-1479
 URL: https://issues.apache.org/jira/browse/KAFKA-1479
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1
 Environment: CentOS
Reporter: Manasi Manasi

 Started noticing that logs are filling up fast with lines like this:
 {quote}
 [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 
 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,26] 
 failed due to Topic sams_2014-05-27 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 
 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-28,38] 
 failed due to Topic sams_2014-05-28 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 
 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,20] 
 failed due to Topic sams_2014-05-30 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 
 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-22,46] 
 failed due to Topic sams_2014-05-22 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 
 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,8] 
 failed due to Topic sams_2014-05-27 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 {quote}
 The above is from kafkaServer.out. Also seeing errors in server.log:
 {quote}
 [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for 
 partition [sams_2014-05-26,19] to broker 0:class 
 kafka.common.UnknownTopicOrPartitionException 
 (kafka.server.ReplicaFetcherThread)
 [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 
 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,4] 
 failed due to Topic sams_2014-05-30 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for 
 partition [sams_2014-05-24,34] to broker 0:class 
 kafka.common.UnknownTopicOrPartitionException 
 (kafka.server.ReplicaFetcherThread)
 [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for 
 partition [sams_2014-05-26,41] to broker 0:class 
 kafka.common.UnknownTopicOrPartitionException 
 (kafka.server.ReplicaFetcherThread)
 [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 
 10887 from client ReplicaFetcherThread-0-2 on partition [2014-05-21,0] failed 
 due to Topic 2014-05-21 either doesn't exist or is in the process of being 
 deleted (kafka.server.KafkaApis)
 [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for 
 partition [sams_2014-05-28,42] to broker 0:class 
 kafka.common.UnknownTopicOrPartitionException 
 (kafka.server.ReplicaFetcherThread)
 [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for 
 partition [sams_2014-05-22,21] to broker 0:class 
 kafka.common.UnknownTopicOrPartitionException 
 (kafka.server.ReplicaFetcherThread)
 [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 
 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-20,26] 
 failed due to Topic sams_2014-05-20 either doesn't exist or is in the process 
 of being deleted (kafka.server.KafkaApis)
 {quote}
 All these partitions belong to deleted topics. Nothing changed on our end 
 when we started noticing these logs filling up. Any ideas what is going on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-12-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249131#comment-14249131
 ] 

Ewen Cheslack-Postava commented on KAFKA-1788:
--

[~bpot] that sounds right, I'm pretty sure metadata never gets cleared if all 
brokers become unavailable -- it's only updated when the producer starts and 
when it gets a metadataResponse message.

You can actually get into the state you're talking about for a long time 
without losing all the brokers. Metadata update requests use 
NetworkClient.leastLoadedNode to select which node to send the request to, 
which means requests may repeatedly go to the same node even if its connection 
isn't getting any data through but the TCP connection hasn't timed out yet. 
That can result in waiting for many minutes even though the metadata might be 
retrievable from a different node.

But I'm not sure it's really a distinct problem, just another variant -- the 
batch stays in the RecordAccumulator eating up bufferpool space until there's a 
network error or response to the request that included the batch. This means 
any failure to make progress sending data would trigger the same issue. I think 
a proper fix for this bug would add a timeout for messages as soon as send() is 
called, and would need to be able to remove them from any point in the pipeline 
after that timeout, cleaning up any resources they use.

The metadata issue is another interesting problem. If you reset the metadata, 
the current implementation will block on any subsequent send() calls since the 
first thing send() does is waitOnMetadata(). Arguably, given the interface of 
send() I'm not sure that blocking that way should ever be allowed, although at 
least now its restricted to the initial send() call and probably simplifies a 
bunch of code. Resetting the metadata could also be counterproductive since the 
set of bootstrap nodes could be smaller than the subset of the cluster you had 
metadata for. One alternative idea: change the use of leastLoadedNode and after 
a certain amount of time, allow it to start considering the bootstrap nodes as 
well as the set currently in the metadata.

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1822) Add echo request

2014-12-16 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-1822:
---

 Summary: Add echo request
 Key: KAFKA-1822
 URL: https://issues.apache.org/jira/browse/KAFKA-1822
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Gwen Shapira
Assignee: Gwen Shapira



Currently there is no simple way to generate a request and validate we receive 
a response without adding a lot of dependencies for the test.
Kafka request classes have quite a few dependencies, so they are not really 
usable when testing infrastructure components or clients.
Generating a byte-array with meaningless request key id as it is done in 
SocketServerTest results in unknown request exception that must be handled. 

I suggest adding an EchoRequest, EchoResponse and EchoHandler. The Request will 
be the usual header and a bytearray. The Response will be a response header and 
the same bytearray echoed back.

Should be useful for client developers and when testing infrastructure changes.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-12-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249255#comment-14249255
 ] 

Jay Kreps commented on KAFKA-1788:
--

Currently the producer supports either blocking or dropping when it cannot send 
to the cluster as fast as data is arriving. This could occur because the 
cluster is down, or just because it isn't fast enough to keep up.

Kafka provides high availability for partitions so the case where a partition 
is permanently unavailable should be rare.

Timing out requests might be nice, but it's not 100% clear that is better than 
the current strategy. The current strategy is just to buffer as long as 
possible and then either block or drop data when the buffer is exhausted. 
Arguably dropping when you are out of space is better than dropping after a 
fixed time (since in any case you have to drop when you are out of space).

As Ewen says we can't reset the metadata because the bootstrap servers may no 
longer exist and if they do they are by definition a subset of the current 
cluster metadata. I think Ewen solution of just making sure leastLoadedNode 
eventually tries all nodes is the right way to go. We'll have to be careful, 
though, as that method is pretty constrained.




 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-12-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266
 ] 

Bhavesh Mistry commented on KAFKA-1788:
---

[~jkreps],

Can we just take quick look at the NodeConnectionState ?  If all registered 
Nodes are down, then  exit it quickly or attempt to connect ?  This will have 
accurate status of al Nodes registered... (may we can do TCP ping for all 
nodes).  I am not sure if producer key is fixed to only one brokers then does 
it still have all Node status ?

Here is reference code:
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
 
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java

I did this in experimental path for o KAFKA-1642   (but used hard coded timeout 
for join method).  

Thanks,

Bhavesh 

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-12-16 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266
 ] 

Bhavesh Mistry edited comment on KAFKA-1788 at 12/17/14 1:26 AM:
-

[~jkreps],

Can we just take quick look at the NodeConnectionState ?  If all registered 
Nodes are down, then  exit it quickly or attempt to connect ?  This will have 
accurate status of all Nodes registered... (may we can do TCP ping for all 
nodes).  I am not sure if producer key is fixed to only one brokers then does 
it still have all Node status ?

Here is reference code:
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
 
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java

I did this in experimental path for o KAFKA-1642   (but used hard coded timeout 
for join method for IO thread and interrupted if it does not get killed ).  

Thanks,

Bhavesh 


was (Author: bmis13):
[~jkreps],

Can we just take quick look at the NodeConnectionState ?  If all registered 
Nodes are down, then  exit it quickly or attempt to connect ?  This will have 
accurate status of al Nodes registered... (may we can do TCP ping for all 
nodes).  I am not sure if producer key is fixed to only one brokers then does 
it still have all Node status ?

Here is reference code:
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
 
https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java

I did this in experimental path for o KAFKA-1642   (but used hard coded timeout 
for join method).  

Thanks,

Bhavesh 

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249443#comment-14249443
 ] 

Neha Narkhede edited comment on KAFKA-1819 at 12/17/14 4:08 AM:


[~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain 
either a cleaner or recovery checkpoint before delete topic is considered 
completed?


was (Author: nehanarkhede):
[~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain 
either a cleaner or recovery checkpoint?

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249443#comment-14249443
 ] 

Neha Narkhede commented on KAFKA-1819:
--

[~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain 
either a cleaner or recovery checkpoint?

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29030: KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start

2014-12-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29030/#review65287
---

Ship it!


Ship It!

- Neha Narkhede


On Dec. 16, 2014, 6:27 p.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29030/
 ---
 
 (Updated Dec. 16, 2014, 6:27 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-742
 https://issues.apache.org/jira/browse/KAFKA-742
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-742: Existing directories under the Kafka data directory without any 
 data cause process to not start
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/Log.scala 
 4fae2f0d339b256832baa62ca4995d10546716b4 
   core/src/main/scala/kafka/log/LogManager.scala 
 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
   core/src/test/scala/unit/kafka/log/LogTest.scala 
 d670ba76acd54e3e88855c56c152c7cc36dddfdc 
 
 Diff: https://reviews.apache.org/r/29030/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ashish Singh
 




[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249451#comment-14249451
 ] 

Gwen Shapira commented on KAFKA-1819:
-

The recovery checkpoints are currently handled correctly (By LogManager, I 
think?), the only issue is with the cleaner file.

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-742) Existing directories under the Kafka data directory without any data cause process to not start

2014-12-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-742:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patch, [~singhashish]. Pushed to trunk

 Existing directories under the Kafka data directory without any data cause 
 process to not start
 ---

 Key: KAFKA-742
 URL: https://issues.apache.org/jira/browse/KAFKA-742
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 0.8.0
Reporter: Chris Curtin
Assignee: Ashish Kumar Singh
 Fix For: 0.8.3

 Attachments: KAFKA-742.1.patch, KAFKA-742.patch


 I incorrectly setup the configuration file to have the metrics go to 
 /var/kafka/metrics while the logs were in /var/kafka. On startup I received 
 the following error then the daemon exited:
 30   [main] INFO  kafka.log.LogManager  - [Log Manager on Broker 0] Loading 
 log 'metrics'
 32   [main] FATAL kafka.server.KafkaServerStartable  - Fatal error during 
 KafkaServerStable startup. Prepare to shutdown
 java.lang.StringIndexOutOfBoundsException: String index out of range: -1
 at java.lang.String.substring(String.java:1937)
 at 
 kafka.log.LogManager.kafka$log$LogManager$$parseTopicPartitionName(LogManager.scala:335)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:112)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109)
 at 
 kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
 at kafka.log.LogManager.loadLogs(LogManager.scala:101)
 at kafka.log.LogManager.init(LogManager.scala:62)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:59)
 at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
 at kafka.Kafka$.main(Kafka.scala:46)
 at kafka.Kafka.main(Kafka.scala)
 34   [main] INFO  kafka.server.KafkaServer  - [Kafka Server 0], shutting down
 This was on a brand new cluster so no data or metrics logs existed yet.
 Moving the metrics to their own directory (not a child of the logs) allowed 
 the daemon to start.
 Took a few minutes to figure out what was wrong.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 23702: Patch for KAFKA-1070

2014-12-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review65289
---


Overall, this looks good. How about adding some unit tests before checking it 
in?

- Neha Narkhede


On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated Nov. 26, 2014, 4:29 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/GenerateBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 0da774d0ed015bdc0461b854e3540ee6e48d1838 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23702: Patch for KAFKA-1070

2014-12-16 Thread Neha Narkhede


 On Dec. 17, 2014, 4:58 a.m., Neha Narkhede wrote:
  Overall, this looks good. How about adding some unit tests before checking 
  it in?

Please ignore this. I was looking at only part of the patch.


- Neha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review65289
---


On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated Nov. 26, 2014, 4:29 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/GenerateBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 0da774d0ed015bdc0461b854e3540ee6e48d1838 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23702: Patch for KAFKA-1070

2014-12-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review65293
---



core/src/main/scala/kafka/common/GenerateBrokerIdException.scala
https://reviews.apache.org/r/23702/#comment108372

Can you fix the doc?



core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
https://reviews.apache.org/r/23702/#comment108373

typo



core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala
https://reviews.apache.org/r/23702/#comment108379

Please include more constructors in both exception classes that allow 
passing in (message, cause), just message, just cause or nothing.



core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala
https://reviews.apache.org/r/23702/#comment108382

version shouldn't be passed in. It should just live in this file.



core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala
https://reviews.apache.org/r/23702/#comment108376

If this crashes before syncing the data, it might lead to a corrupted 
meta.properties file. We should probably write to a tmp file and atomically 
swap it in, similar to how we handle writes in other checkpoint files (eg. 
OffsetCheckpoint). 

Also, how about naming this BrokerMetadata?



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment108377

Can you document the broker ids that don't match?

Configured brokerId %d doesn't match stored brokerId %d in meta.properties



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment108378

Can you use error(..., e)?



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment108380

Please include the cause in the exception constructor. 

throw new GenerateBrokerIdException(Failed..., e);



core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
https://reviews.apache.org/r/23702/#comment108381

offset checkpoint?


- Neha Narkhede


On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated Nov. 26, 2014, 4:29 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/GenerateBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 0da774d0ed015bdc0461b854e3540ee6e48d1838 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Build failed in Jenkins: Kafka-trunk #355

2014-12-16 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/355/changes

Changes:

[neha.narkhede] KAFKA-742: Existing directories under the Kafka data directory 
without any data cause process to not start; reviewed by Neha Narkhede

--
[...truncated 1516 lines...]
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.PrimitiveApiTest  testConsumerEmptyTopic FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.PrimitiveApiTest  testPipelinedProduceRequests FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooLow 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 

[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-12-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249492#comment-14249492
 ] 

Neha Narkhede commented on KAFKA-1070:
--

Thanks for the updated patch [~sriharsha]. Almost there. Left a few more 
cleanup comments.

 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.3

 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
 KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
 KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, 
 KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch


 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29072: rebased for 0.8.2 branch

2014-12-16 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29072/#review65296
---


I found the same review comments I had on trunk. Can you update this patch 
after addressing the review comments on trunk?


clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
https://reviews.apache.org/r/29072/#comment108383

minor nit pick to be addressed on checkin: do nothing



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
https://reviews.apache.org/r/29072/#comment108384

Same as the review on trunk. Can you also make sure the examples are 
updated?

Actually, maybe the review comments from trunk all apply here.



clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
https://reviews.apache.org/r/29072/#comment108385

ditto



clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
https://reviews.apache.org/r/29072/#comment108386

same as review on trunk


- Neha Narkhede


On Dec. 16, 2014, 12:49 a.m., Jun Rao wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29072/
 ---
 
 (Updated Dec. 16, 2014, 12:49 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: kafka-1797
 https://issues.apache.org/jira/browse/kafka-1797
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix imports
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 227f5646ee708af1b861c15237eda2140cfd4900 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
 46efc0c8483acacf42b2984ac3f3b9e0a4566187 
   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
 436d8a479166eda29f2672b50fc99f288bbe3fa9 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 2ecfc8aaea90a7353bd0dabc4c0ebcc6fd9535ec 
   clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 fe93afa24fc20b03830f1d190a276041d15bd3b9 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 c3aad3b4d6b677f759583f309061193f2f109250 
   
 clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 32f444ebbd27892275af7a0947b86a6b8317a374 
   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
 36e8398416036cab84faad1f07159e5adefd8086 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 9095caf0db1e41a4acb4216fb197626fbd85b806 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 
 c3181b368b6cf15e7134b04e8ff5655a9321ee0b 
   clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
  40e8234f8771098b097bf757a86d5ac98604df5e 
   
 clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java
  PRE-CREATION 
   core/src/main/scala/kafka/producer/BaseProducer.scala 
 b0207930dd0543f2c51f0b35002e13bf104340ff 
   core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala 
 4b5b823b85477394cd50eb2a66877a3b8b35b57f 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
   core/src/main/scala/kafka/tools/ReplayLogProducer.scala 
 3393a3dd574ac45a27bf7eda646b737146c55038 
   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
 67196f30af1cfcd40ded20ca970082b78504f6af 
   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
   core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
 6379f2b60af797b084981c94fd84b3d7740aa8a5 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
 d407af9144ef6930d737a6dcf23591c1f6342f87 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 0da774d0ed015bdc0461b854e3540ee6e48d1838 
 
 Diff: https://reviews.apache.org/r/29072/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jun Rao
 




[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics

2014-12-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249530#comment-14249530
 ] 

Joel Koshy commented on KAFKA-1819:
---

[~gwenshap] that's right - that's what I meant by force write the checkpoints 
if cleaning was not in progress. As you said, it needs to happen proactively 
when deleting a log, but we probably don't need to force a cleaning for that 
since we just need to update the cleaner checkpoint file. So I was thinking we 
could refactor the code a tiny bit to have a helper write out the checkpoint 
file and call that from both doneCleaning as well as when deleting logs.

 Cleaner gets confused about deleted and re-created topics
 -

 Key: KAFKA-1819
 URL: https://issues.apache.org/jira/browse/KAFKA-1819
 Project: Kafka
  Issue Type: Bug
Reporter: Gian Merlino
Priority: Blocker
 Fix For: 0.8.2


 I get an error like this after deleting a compacted topic and re-creating it. 
 I think it's because the brokers don't remove cleaning checkpoints from the 
 cleaner-offset-checkpoint file. This is from a build based off commit bd212b7.
 java.lang.IllegalArgumentException: requirement failed: Last clean offset is 
 587607 but segment base offset is 0 for log foo-6.
 at scala.Predef$.require(Predef.scala:233)
 at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502)
 at kafka.log.Cleaner.clean(LogCleaner.scala:300)
 at 
 kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214)
 at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1499) Broker-side compression configuration

2014-12-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249536#comment-14249536
 ] 

Joel Koshy commented on KAFKA-1499:
---

Thank you for the ping, and sorry about the delayed review. I should be able to 
get to this within a day.

 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.3

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, 
 KAFKA-1499_2014-12-16_22:39:10.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-16 Thread Joel Koshy
Jun,

Thanks for summarizing this - it helps confirm for me that I did not
misunderstand anything in this thread so far; and that I disagree with
the premise that the steps in using the current byte-oriented API is
cumbersome or inflexible. It involves instantiating the K-V
serializers in code (as opposed to config) and a extra (but explicit
- i.e., making it very clear to the user) but simple call to serialize
before sending.

The point about downstream queries breaking can happen just as well
with the implicit serializers/deserializers - since ultimately people
have to instantiate the specific type in their code and if they want
to send it they will.

I think adoption is also equivalent since people will just instantiate
whatever serializer/deserializer they want in one line. Plugging in a
new serializer implementation does require a code change, but that can
also be avoided via a config driven factory.

So I'm still +0 on the change but I'm definitely not against moving
forward with the changes. i.e., unless there is any strong -1 on the
proposal from anyone else.

Thanks,

Joel

 With a byte array interface, of course there is nothing that one can't do.
 However, the real question is that whether we want to encourage people to
 use it this way or not. Being able to flow just bytes is definitely easier
 to get started. That's why many early adopters choose to do it that way.
 However, it's often the case that they start feeling the pain later when
 some producers change the data format. Their Hive/Pig queries start to
 break and it's a painful process to have the issue fixed. So, the purpose
 of this api change is really to encourage people to standardize on a single
 serializer/deserializer that supports things like data validation and
 schema evolution upstream in the producer. Now, suppose there is an Avro
 serializer/deserializer implementation. How do we make it easy for people
 to adopt? If the serializer is part of the api, we can just say, wire in
 the Avro serializer for key and/or value in the config and then you can
 start sending Avro records to the producer. If the serializer is not part
 of the api, we have to say, first instantiate a key and/or value serializer
 this way, send the key to the key serializer to get the key bytes, send the
 value to the value serializer to get the value bytes, and finally send the
 bytes to the producer. The former will be simpler and likely makes the
 adoption easier.
 
 Thanks,
 
 Jun
 
 On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Documentation is inevitable even if the serializer/deserializer is
  part of the API - since the user has to set it up in the configs. So
  again, you can only encourage people to use it through documentation.
  The simpler byte-oriented API seems clearer to me because anyone who
  needs to send (or receive) a specific data type will _be forced to_
  (or actually, _intuitively_) select a serializer (or deserializer) and
  will definitely pick an already available implementation if a good one
  already exists.
 
  Sorry I still don't get it and this is really the only sticking point
  for me, albeit a minor one (which is why I have been +0 all along on
  the change). I (and I think many others) would appreciate it if
  someone can help me understand this better.  So I will repeat the
  question: What usage pattern cannot be supported by easily by the
  simpler API without adding burden on the user?
 
  Thanks,
 
  Joel
 
  On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote:
   Joel,
  
   It's just that if the serializer/deserializer is not part of the API, you
   can only encourage people to use it through documentation. However, not
   everyone will read the documentation if it's not directly used in the
  API.
  
   Thanks,
  
   Jun
  
   On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy jjkosh...@gmail.com wrote:
  
(sorry about the late follow-up late - I'm traveling most of this
month)
   
I'm likely missing something obvious, but I find the following to be a
somewhat vague point that has been mentioned more than once in this
thread without a clear explanation. i.e., why is it hard to share a
serializer/deserializer implementation and just have the clients call
it before a send/receive? What usage pattern cannot be supported by
the simpler API?
   
 1. Can we keep the serialization semantics outside the Producer
  interface
 and have simple bytes in / bytes out for the interface (This is what
  we
 have today).

 The points for this is to keep the interface simple and usage easy to
 understand. The points against this is that it gets hard to share
  common
 usage patterns around serialization/message validations for the
  future.
   
   
On Tue, Dec 09, 2014 at 03:51:08AM +, Sriram Subramanian wrote:
 Thank you Jay. I agree with the issue that you point w.r.t paired
 serializers. I also think having mix serialization