Review Request 28694: Patch for KAFKA-1501

2014-12-04 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28694/
---

Review request for kafka.


Bugs: KAFKA-1501
https://issues.apache.org/jira/browse/KAFKA-1501


Repository: kafka


Description
---

KAFKA-1501 Ensure tests allocate all ports simultaneously to ensure ports won't 
be reused.


Diffs
-

  core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
6379f2b60af797b084981c94fd84b3d7740aa8a5 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
a913fe59ba6f7c86a48e264ff85158a345b4e9e4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
d407af9144ef6930d737a6dcf23591c1f6342f87 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
1bf2667f47853585bc33ffb3e28256ec5f24ae84 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 
e28979827110dfbbb92fe5b152e7f1cc973de400 
  core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
29cc01bcef9cacd8dec1f5d662644fc6fe4994bc 
  core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala 
ac6dd2087de4538feff8a7b5206b992851f29f83 
  core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
c0355cc0135c6af2e346b4715659353a31723b86 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
  core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 
95303e098d40cd790fb370e9b5a47d20860a6da3 
  core/src/test/scala/unit/kafka/integration/FetcherTest.scala 
25845abbcad2e79f56f729e59239b738d3ddbc9d 
  core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
3cf7c9bcd64492d05590067a8ad11d31096a8e5e 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
a5386a03b62956bc440b40783247c8cdf7432315 
  core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala 
108c2e7f47ede038855e7fa3c3df582d86e8c5c3 
  core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 
eab4b5f619015af42e4554660eafb5208e72ea33 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
35dc071b1056e775326981573c9618d8046e601d 
  core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
ba3bcdcd1de9843e75e5395dff2fc31b39a5a9d5 
  
core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
 d6248b09bb0f86ee7d3bd0ebce5b99135491453b 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
d670ba76acd54e3e88855c56c152c7cc36dddfdc 
  core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 
4ea0489c9fd36983fe190491a086b39413f3a9cd 
  core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
3cf23b3d6d4460535b90cfb36281714788fc681c 
  core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
5f4d85254c384dcc27a5a84f0836ea225d3a901a 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
1db6ac329f7b54e600802c8a623f80d159d4e69b 
  core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
ce65dab4910d9182e6774f6ef1a7f45561ec0c23 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
d60d8e0f49443f4dc8bc2cad6e2f951eda28f5cb 
  core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
f0c4a56b61b4f081cf4bee799c6e9c523ff45e19 
  core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
ad121169a5e80ebe1d311b95b219841ed69388e2 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
8913fc1d59f717c6b3ed12c8362080fb5698986b 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
a703d2715048c5602635127451593903f8d20576 
  core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
2377abe4933e065d037828a214c3a87e1773a8ef 
  core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
  core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
c06ee756bf0fe07e5d3c92823a476c960b37afd6 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
d5d351c4f25933da0ba776a6a89a989f1ca6a902 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 
  core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala 
da4bafc1e2a94a436efe395aab1888fc21e55748 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
faa907131ed0aa94a7eacb78c1ffb576062be87a 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
67918f2842d474269f3009f8131f6ffb6278b041 
  core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
8fe7cd496f74ae1031019c2ca9ca013302294d9b 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
ccf5e2e36260b2484181b81d1b06e81de972674b 
  core/src/test/scala/unit/kafka/utils/NetworkTestHarness.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala 
84e08557de5acdcf0a98b192feac72836ea359b8 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
0da774d

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 3, 2014, 11:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 27430: Fix KAFKA-1720

2014-12-04 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/#review63803
---

Ship it!


Ship It!

- Joel Koshy


On Dec. 3, 2014, 9:34 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27430/
> ---
> 
> (Updated Dec. 3, 2014, 9:34 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1720
> https://issues.apache.org/jira/browse/KAFKA-1720
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Rebased
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> b9fde2aacbb1fae9e18942dc81fa63c53b9fdb5f 
>   core/src/main/scala/kafka/server/DelayedFetch.scala 
> 1e2e56f87a3c2e8079e4ad7241a1ab945eeb638c 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 
> 1603066d33f82290f3e9b8c5f1a9092d26cf86b8 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> b3566b0bc33fe335bd1009deee1e011ff245e01a 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 323b12e765f981e9bba736a204e4a8159e5c5ada 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> a7720d579ea15b71511c9da0e241bd087de3674e 
>   system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 
> 
> Diff: https://reviews.apache.org/r/27430/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 4, 2014, 3:02 a.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > 
> >
> > I think there may be a race condition here, for example consider this 
> > sequence:
> > 
> > 1. data channel only contain one message.
> > 2. producer take the message from channel.
> > 3. dataChannel.clear() called.
> > 4. numMessageUnacked.get() == 0, offsets committed.
> > 5. producer.send() called, increment numMessageUnacked.
> > 6. data duplicate happens when the rebalance finished.
> > 
> > I think on line 599 we should use "while" instead of "if", but this 
> > alone does not fix this.
> 
> Jiangjie Qin wrote:
> Yes, I actually have comment on this race condition in line 581. The 
> reason I'm not handling it here is:
> 1. The chance of this situation is very slight.
> 2. A single duplicate message does not really hurt.
> 3. The fix increase the complexity of the code (looking into the producer 
> thread status) and I'm not sure if it worth doing.
> 4. Even if we fix this, from the producer side, duplicates could still 
> happen.
> 
> Guozhang Wang wrote:
> Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at 
> least?

Yes, it should be a while loop, forgot to change it...


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 4, 2014, 3:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > 
> >
> > I think there may be a race condition here, for example consider this 
> > sequence:
> > 
> > 1. data channel only contain one message.
> > 2. producer take the message from channel.
> > 3. dataChannel.clear() called.
> > 4. numMessageUnacked.get() == 0, offsets committed.
> > 5. producer.send() called, increment numMessageUnacked.
> > 6. data duplicate happens when the rebalance finished.
> > 
> > I think on line 599 we should use "while" instead of "if", but this 
> > alone does not fix this.

Yes, I actually have comment on this race condition in line 581. The reason I'm 
not handling it here is:
1. The chance of this situation is very slight.
2. A single duplicate message does not really hurt.
3. The fix increase the complexity of the code (looking into the producer 
thread status) and I'm not sure if it worth doing.
4. Even if we fix this, from the producer side, duplicates could still happen.


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 96
> > 
> >
> > Could you add a comment here about the in-flight-request config and its 
> > effects?

The comments was put at the very beginning with a note. Maybe we can put a 
comment referring to that note.


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 186
> > 
> >
> > I am wondering are there some scenarios we want to allow customized 
> > rebalance listener? Also if we decide to make this customizable we need to 
> > make it clear that the customized listener would expect the datachannel as 
> > its constructor since this is not checked at compile time.

Yes, we do foresee some usecases for this customized rebalance listener. I'll 
add the following comments:
"Customized consumer rebalance listener should extends 
MirrorMakerConsumerRebalanceListener and take datachannel as argument."


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Nov. 24, 2014, 4:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apa

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang


> On Dec. 3, 2014, 6:57 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 613
> > 
> >
> > I think there may be a race condition here, for example consider this 
> > sequence:
> > 
> > 1. data channel only contain one message.
> > 2. producer take the message from channel.
> > 3. dataChannel.clear() called.
> > 4. numMessageUnacked.get() == 0, offsets committed.
> > 5. producer.send() called, increment numMessageUnacked.
> > 6. data duplicate happens when the rebalance finished.
> > 
> > I think on line 599 we should use "while" instead of "if", but this 
> > alone does not fix this.
> 
> Jiangjie Qin wrote:
> Yes, I actually have comment on this race condition in line 581. The 
> reason I'm not handling it here is:
> 1. The chance of this situation is very slight.
> 2. A single duplicate message does not really hurt.
> 3. The fix increase the complexity of the code (looking into the producer 
> thread status) and I'm not sure if it worth doing.
> 4. Even if we fix this, from the producer side, duplicates could still 
> happen.

Shall we change line 691 to "while (numMessageUnacked.get() > 0)" at least?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---


On Dec. 3, 2014, 11:02 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 3, 2014, 11:02 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



Re: Review Request 28481: Patch for KAFKA-1792

2014-12-04 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28481/#review63757
---



core/src/main/scala/kafka/admin/AdminUtils.scala


Would be good to clarify what i stands for



core/src/main/scala/kafka/admin/AdminUtils.scala


In this example, when you describe an assignment as 2 numbers, it is 
unclear what each number stands for. Would be good to clarify that



core/src/main/scala/kafka/admin/AdminUtils.scala


Would be good to clarify what you mean by "load". Is that # of replicas on 
the broker?



core/src/main/scala/kafka/admin/AdminUtils.scala


Wouldn't the number be negative here? Did you mean sum(abs(FL(i) - L(i))) ?



core/src/main/scala/kafka/admin/AdminUtils.scala


same here. What about when FL(i) < L(i)



core/src/main/scala/kafka/admin/AdminUtils.scala


With every offload(i), the load status of the cluster changes. So, is one 
iterator of offload(i) on every broker sufficient to read an ideal state? I 
guess it is, but this is unclear from the explanation



core/src/main/scala/kafka/admin/AdminUtils.scala


This is confusing. How is is that you are offloading from every single 
broker?



core/src/main/scala/kafka/admin/AdminUtils.scala


Actually D(i) < 0 should mean that the actual load on the broker is more 
than the ideal or fair load, so shouldn't we offload a few replicas from this 
broker?



core/src/main/scala/kafka/admin/AdminUtils.scala


Would be good to clarify if the load is spread in a round robin fashion



core/src/main/scala/kafka/admin/AdminUtils.scala


Because it's unclear what the - convention means, it is 
difficult to understand this part.



core/src/main/scala/kafka/admin/AdminUtils.scala


I think it may be illegal to create a topic with replication factor greater 
than # of brokers. Would be good to clarify the behavior here.



core/src/test/scala/unit/kafka/admin/AdminTest.scala


Same here. Please explain the convention. -|...


- Neha Narkhede


On Dec. 3, 2014, 5:25 p.m., Dmitry Pekar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28481/
> ---
> 
> (Updated Dec. 3, 2014, 5:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1792
> https://issues.apache.org/jira/browse/KAFKA-1792
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1792: CR
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> 28b12c7b89a56c113b665fbde1b95f873f8624a3 
>   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
> 979992b68af3723cd229845faff81c641123bb88 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> e28979827110dfbbb92fe5b152e7f1cc973de400 
>   topics.json ff011ed381e781b9a177036001d44dca3eac586f 
> 
> Diff: https://reviews.apache.org/r/28481/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Dmitry Pekar
> 
>



Re: Review Request 27430: Fix KAFKA-1720

2014-12-04 Thread Guozhang Wang


> On Nov. 25, 2014, 5:50 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 87
> > 
> >
> > In Kafka-1583 we had discussed renaming ReplicaManager to 
> > ReplicatedLogManager or something like that.

I think as discussed this change will not be done until after the coordinator 
and the controller refactoring work.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/#review63003
---


On Nov. 1, 2014, 12:21 a.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27430/
> ---
> 
> (Updated Nov. 1, 2014, 12:21 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1720
> https://issues.apache.org/jira/browse/KAFKA-1720
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Rename delayed requests to delayed operations, change some class names
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> 1be57008e983fc3a831626ecf9a861f164fcca92 
>   core/src/main/scala/kafka/server/DelayedFetch.scala 
> 1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 
> 8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala 
> 628ef59564b9b9238d7b05d26aef79d3cfec174d 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 3007a6d89b637b93f71fdb7adab561a93d9c4c62 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 
> 323b12e765f981e9bba736a204e4a8159e5c5ada 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
> a7720d579ea15b71511c9da0e241bd087de3674e 
>   system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 
> 
> Diff: https://reviews.apache.org/r/27430/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 27430: Fix KAFKA-1720

2014-12-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/
---

(Updated Dec. 3, 2014, 9:34 p.m.)


Review request for kafka.


Bugs: KAFKA-1720
https://issues.apache.org/jira/browse/KAFKA-1720


Repository: kafka


Description (updated)
---

Rebased


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
b9fde2aacbb1fae9e18942dc81fa63c53b9fdb5f 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
1e2e56f87a3c2e8079e4ad7241a1ab945eeb638c 
  core/src/main/scala/kafka/server/DelayedOperationKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
1603066d33f82290f3e9b8c5f1a9092d26cf86b8 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
b3566b0bc33fe335bd1009deee1e011ff245e01a 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
323b12e765f981e9bba736a204e4a8159e5c5ada 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a7720d579ea15b71511c9da0e241bd087de3674e 
  system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 

Diff: https://reviews.apache.org/r/27430/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 28481: Patch for KAFKA-1792

2014-12-04 Thread Dmitry Pekar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28481/
---

(Updated Dec. 3, 2014, 5:25 p.m.)


Review request for kafka.


Bugs: KAFKA-1792
https://issues.apache.org/jira/browse/KAFKA-1792


Repository: kafka


Description (updated)
---

KAFKA-1792: CR


Diffs (updated)
-

  core/src/main/scala/kafka/admin/AdminUtils.scala 
28b12c7b89a56c113b665fbde1b95f873f8624a3 
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
979992b68af3723cd229845faff81c641123bb88 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala 
e28979827110dfbbb92fe5b152e7f1cc973de400 
  topics.json ff011ed381e781b9a177036001d44dca3eac586f 

Diff: https://reviews.apache.org/r/28481/diff/


Testing
---


Thanks,

Dmitry Pekar



Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63703
---



core/src/main/scala/kafka/tools/MirrorMaker.scala


Could you add a comment here about the in-flight-request config and its 
effects?



core/src/main/scala/kafka/tools/MirrorMaker.scala


I am wondering are there some scenarios we want to allow customized 
rebalance listener? Also if we decide to make this customizable we need to make 
it clear that the customized listener would expect the datachannel as its 
constructor since this is not checked at compile time.



core/src/main/scala/kafka/tools/MirrorMaker.scala


I think there may be a race condition here, for example consider this 
sequence:

1. data channel only contain one message.
2. producer take the message from channel.
3. dataChannel.clear() called.
4. numMessageUnacked.get() == 0, offsets committed.
5. producer.send() called, increment numMessageUnacked.
6. data duplicate happens when the rebalance finished.

I think on line 599 we should use "while" instead of "if", but this alone 
does not fix this.


- Guozhang Wang


On Nov. 24, 2014, 4:15 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Nov. 24, 2014, 4:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-04 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234297#comment-14234297
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

[~ewencp],

1) I will posted toward KAFKA-1788 and perhaps link the issue.
2) True , some sort of measure would be great 5,10...25 50, 95 and 99 
percentile would be great of execution time.  The point is just measure the 
duration report the rate of execution. 
3) Agree with what you are saying and I have observed same behavior.  But only 
recommendation is to add some intelligence to *timeouts* to detect if for long 
period and consecutive timeout is zero then there is problem. (Little more 
defensive) 
4) Again I agree with you point, but based in your previous comments you had 
mentioned that you may consider having back-off logic further up the chain. So 
I was just checking run() is best place to do that check.  Again, may be add 
intelligence here if you get consecutive “Exception” then likelihood of high 
CPU is high.  
 
5) Ok.  I agree what you are saying is data needs to be de-queue so more data 
can be en-queue even in event of network lost.  Is my understanding correct ?

6) All I am saying is network firewall rule (such as only 2 TCP connections per 
source host) or Brokers running out of File Descriptor so new connection to 
broker is not established but Client have live and active TCP connection to 
same broker.  But based on what I see in the method * initiateConnect* will 
mark the entire Broker or Node status as disconnected.  Is this expected 
behavior?  So question is: will client continue to send data ?

Thank you very much for entertaining my questions so far and I will test out 
the patch next week.

Thanks,

Bhavesh 

> [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
> connection is lost
> ---
>
> Key: KAFKA-1642
> URL: https://issues.apache.org/jira/browse/KAFKA-1642
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.2
>Reporter: Bhavesh Mistry
>Assignee: Ewen Cheslack-Postava
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: 
> 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
> KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
> KAFKA-1642_2014-10-23_16:19:41.patch
>
>
> I see my CPU spike to 100% when network connection is lost for while.  It 
> seems network  IO thread are very busy logging following error message.  Is 
> this expected behavior ?
> 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
> org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
> producer I/O thread: 
> java.lang.IllegalStateException: No entry found for node -2
> at 
> org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
> at 
> org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
> at 
> org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:744)
> Thanks,
> Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-12-04 Thread BalajiSeshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234415#comment-14234415
 ] 

BalajiSeshadri commented on KAFKA-1476:
---

[~nehanarkhede] or [~junrao] or [~jkreps] Please assign the ticket to 
[~techybalaji].

Dish is not supporting my contribution in work hours so i would like continue 
as personal work.

Please reassign ticket to techybalaji in future.

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: BalajiSeshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, 
> KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, 
> KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, 
> KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-04 Thread Jiangjie Qin

I'm just thinking instead of binding serialization with producer, another
option is to bind serializer/deserializer with
ProducerRecord/ConsumerRecord (please see the detail proposal below.)
   The arguments for this option is:
A. A single producer could send different message types. There are
several use cases in LinkedIn for per record serializer
- In Samza, there are some in-stream order-sensitive control messages
having different deserializer from other messages.
- There are use cases which need support for sending both Avro messages
and raw bytes.
- Some use cases needs to deserialize some Avro messages into generic
record and some other messages into specific record.
B. In current proposal, the serializer/deserilizer is instantiated
according to config. Compared with that, binding serializer with
ProducerRecord and ConsumerRecord is less error prone.


This option includes the following changes:
A. Add serializer and deserializer interfaces to replace serializer
instance from config.
Public interface Serializer  {
public byte[] serializeKey(K key);
public byte[] serializeValue(V value);
}
Public interface deserializer  {
Public K deserializeKey(byte[] key);
public V deserializeValue(byte[] value);
}

B. Make ProducerRecord and ConsumerRecord abstract class implementing
Serializer  and Deserializer  respectively.
Public abstract class ProducerRecord  implements 
Serializer 
{...}
Public abstract class ConsumerRecord  implements 
Deserializer  {...}

C. Instead of instantiate the serializer/Deserializer from config, let
concrete ProducerRecord/ConsumerRecord extends the abstract class and
override the serialize/deserialize methods.

Public class AvroProducerRecord extends ProducerRecord  {
...
@Override
Public byte[] serializeKey(String key) {Š}
@Override
public byte[] serializeValue(GenericRecord value);
}

Public class AvroConsumerRecord extends ConsumerRecord  {
...
@Override
Public K deserializeKey(byte[] key) {Š}
@Override
public V deserializeValue(byte[] value);
}

D. The producer API changes to
Public class KafkaProducer {
...

Future send (ProducerRecord  
record) {
...
K key = record.serializeKey(record.key);
V value = record.serializedValue(record.value);
BytesProducerRecord bytesProducerRecord = new
BytesProducerRecord(topic, partition, key, value);
...
}
...
}



We also had some brainstorm in LinkedIn and here are the feedbacks:

If the community decide to add the serialization back to new producer,
besides current proposal which changes new producer API to be a template,
there are some other options raised during our discussion:
1) Rather than change current new producer API, we can provide a wrapper
of current new producer (e.g. KafkaSerializedProducer) and make it
available to users. As there is value in the simplicity of current API.

2) If we decide to go with tempalated new producer API, according to
experience in LinkedIn, it might worth considering to instantiate the
serializer in code instead of from config so we can avoid runtime errors
due to dynamic instantiation from config, which is more error prone. If
that is the case, the producer API could be changed to something like:
producer = new Producer(KeySerializer, 
ValueSerializer)

--Jiangjie (Becket) Qin


On 11/24/14, 5:58 PM, "Jun Rao"  wrote:

>Hi, Everyone,
>
>I'd like to start a discussion on whether it makes sense to add the
>serializer api back to the new java producer. Currently, the new java
>producer takes a byte array for both the key and the value. While this api
>is simple, it pushes the serialization logic into the application. This
>makes it hard to reason about what type of data is being sent to Kafka and
>also makes it hard to share an implementation of the serializer. For
>example, to support Avro, the serialization logic could be quite involved
>since it might need to register the Avro schema in some remote registry
>and
>maintain a schema cache locally, etc. Without a serialization api, it's
>impossible to share such an implementation so that people can easily
>reuse.
>We sort of overlooked this implication duri

Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review63878
---

Ship it!


Ship It!

- Guozhang Wang


On Dec. 4, 2014, 3:02 a.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> ---
> 
> (Updated Dec. 4, 2014, 3:02 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650 and KAKFA-1650
> https://issues.apache.org/jira/browse/KAFKA-1650
> https://issues.apache.org/jira/browse/KAKFA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments
> 
> 
> commit before switch to trunk
> 
> 
> commit before rebase
> 
> 
> Rebased on trunk, Addressed Guozhang's comments.
> 
> 
> Addressed Guozhang's comments on MaxInFlightRequests
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.
> 
> 
> Added consumer rebalance listener to mirror maker, will test it later.
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> Conflicts:
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
> 
> added custom config for consumer rebalance listener
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Add configurable consumer rebalance listener
> 
> 
> Incorporated Guozhang's comments
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> mirrormaker-redesign
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Addressed Guozhang's comment.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> da29a8cb461099eb675161db2f11a9937424a5c6 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 9d5a47fb8e04d0055cce820afde7f73affc0a984 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> f399105087588946987bbc84e3759935d9498b6a 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
> 6a85d7e494f6c88798133a17f6180b61029dff58 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8c4687b2c96fddab7c8454a5a8011c3bab0897a0 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Commented] (KAFKA-1790) Remote controlled shutdown was removed

2014-12-04 Thread James Oliver (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234496#comment-14234496
 ] 

James Oliver commented on KAFKA-1790:
-

Thanks for the input, Joel. We'll keep that in mind as a possible solution.

Jun, we are using Apache Mesos which uses resource isolation instead of VMs to 
schedule distributed workloads (Kafka in this case). I'm not sure if it's 
considered the exception anymore - it is a different way of doing things. For 
example, Google uses isolation instead of VMs for its datacenter computing. In 
this context, there is no VM but simply a sandbox (path on some mesos-slave). 

To bring down an instance, the entire process tree is sent a signal to 
shutdown. However, the shutdown timeouts in our scheduling framework are mostly 
out of our control - there's no good way to guarantee controlled shutdown 
occurs before the process is forced to terminate. In order to make sure it 
happens, we instead thought we might send a request directly to the broker to 
perform the controlled shutdown and, once everything is jolly, send the kill 
signal (which causes a restart). Since the remote controlled shutdown went 
away, we now think we might need to defer that responsibility to a wrapper 
script with a simple remote interface. Thoughts?

> Remote controlled shutdown was removed
> --
>
> Key: KAFKA-1790
> URL: https://issues.apache.org/jira/browse/KAFKA-1790
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
>
> In core:
> kafka.admin.ShutdownBroker was removed, rendering remote controlled shutdowns 
> impossible. 
> A Kafka administrator needs to be able to perform a controlled shutdown 
> without issuing a SIGTERM/SIGKILL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1772) Add an Admin message type for request response

2014-12-04 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234530#comment-14234530
 ] 

Jun Rao commented on KAFKA-1772:


It makes sense to have a single ClusterMetadata request to get all brokers and 
the controller, which is what KAFKA-1802 tries to do. However, I am not sure if 
it makes sense to have a generic "list" command that can list everything. The 
responses for different entities (topic, cluster, consumer, etc) will be quite 
different and you will need to have a union of all those types. It will be also 
a bit weird that in order to find out about the controller, you need to provide 
a topic name. 

Another thought is that perhaps we can have a request per entity type that 
handles all actions on that entity. For example, we can probably have a single 
TopicRequest that handles all the create, alter, delete, and list on topics. 
That will reduce the type of requests needed without making the 
request/response format too complicated.

> Add an Admin message type for request response
> --
>
> Key: KAFKA-1772
> URL: https://issues.apache.org/jira/browse/KAFKA-1772
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Andrii Biletskyi
> Fix For: 0.8.3
>
> Attachments: KAFKA-1772.patch, KAFKA-1772_2014-12-02_16:23:26.patch
>
>
> - utility int8
> - command int8
> - format int8
> - args variable length bytes
> utility 
> 0 - Broker
> 1 - Topic
> 2 - Replication
> 3 - Controller
> 4 - Consumer
> 5 - Producer
> Command
> 0 - Create
> 1 - Alter
> 3 - Delete
> 4 - List
> 5 - Audit
> format
> 0 - JSON
> args e.g. (which would equate to the data structure values == 2,1,0)
> "meta-store": {
> {"zookeeper":"localhost:12913/kafka"}
> }"args": {
>  "partitions":
>   [
> {"topic": "topic1", "partition": "0"},
> {"topic": "topic1", "partition": "1"},
> {"topic": "topic1", "partition": "2"},
>  
> {"topic": "topic2", "partition": "0"},
> {"topic": "topic2", "partition": "1"},
>   ]
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1790) Remote controlled shutdown was removed

2014-12-04 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234545#comment-14234545
 ] 

Jun Rao commented on KAFKA-1790:


James,

Thanks for the explanation. It seems that not being able to customize the 
shutdown timeout can be a general issue, not just limited to Kafka. There are 
potentially other systems that may take a bit of time to shutdown cleanly. Is 
that an issue with Mesos? Does it have plans to address this issue?

> Remote controlled shutdown was removed
> --
>
> Key: KAFKA-1790
> URL: https://issues.apache.org/jira/browse/KAFKA-1790
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: James Oliver
>Assignee: James Oliver
>Priority: Blocker
> Fix For: 0.8.2
>
>
> In core:
> kafka.admin.ShutdownBroker was removed, rendering remote controlled shutdowns 
> impossible. 
> A Kafka administrator needs to be able to perform a controlled shutdown 
> without issuing a SIGTERM/SIGKILL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-12-04 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/
---

(Updated Dec. 4, 2014, 7:59 p.m.)


Review request for kafka.


Bugs: KAFKA-1650 and KAKFA-1650
https://issues.apache.org/jira/browse/KAFKA-1650
https://issues.apache.org/jira/browse/KAKFA-1650


Repository: kafka


Description (updated)
---

Addressed Guozhang's comments.


Addressed Guozhang's comments


commit before switch to trunk


commit before rebase


Rebased on trunk, Addressed Guozhang's comments.


Addressed Guozhang's comments on MaxInFlightRequests


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments.


Added consumer rebalance listener to mirror maker, will test it later.


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign

Conflicts:
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala

added custom config for consumer rebalance listener


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Add configurable consumer rebalance listener


Incorporated Guozhang's comments


Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
mirrormaker-redesign


Incorporated Guozhang's comments.


Addressed Guozhang's comment.


numMessageUnacked should be decremented no matter the send was successful or 
not.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java 
PRE-CREATION 
  core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
9d5a47fb8e04d0055cce820afde7f73affc0a984 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 
f399105087588946987bbc84e3759935d9498b6a 
  core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala 
6a85d7e494f6c88798133a17f6180b61029dff58 
  core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
8c4687b2c96fddab7c8454a5a8011c3bab0897a0 

Diff: https://reviews.apache.org/r/25995/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-04 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234554#comment-14234554
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

Updated reviewboard https://reviews.apache.org/r/25995/diff/
 against branch origin/trunk

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-04 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1650:

Attachment: KAFKA-1650_2014-12-04_11:59:07.patch

> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-04 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234557#comment-14234557
 ] 

Jiangjie Qin commented on KAFKA-1650:
-

[~junrao] Could you also help review the patch as Joel thought it is kind of a 
big patch of new design for mirror maker. 
Some major changes are listed below to expedite the review: 
1. ZookeeperConsumerConnector auto commit is turned off.
2. A Map> is used to 
keep track of the offsets of messages successfully sent to target cluster. The 
map is updated on new producer's callback.
3. ZookeeperConsumerConnector has been modified to take a rebalance listener 
and take external offset map to commit offset.
4. Mirror maker wires in a consumer rebalance listener to avoid duplicates on 
consumer rebalance. On consumer rebalance, before releasing the partition 
ownership, it will first clean the messages in data channel, wait until the 
messages taken by producer receives callback, commit offset and continue the 
rebalance.
5. An offset commit thread will be responsible for committing offset 
periodically.


> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.

2014-12-04 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234557#comment-14234557
 ] 

Jiangjie Qin edited comment on KAFKA-1650 at 12/4/14 8:02 PM:
--

[~junrao] Could you also help review the patch as Joel thought it is kind of a 
big patch of new design for mirror maker.  Thanks.
Some major changes are listed below to expedite the review: 
1. ZookeeperConsumerConnector auto commit is turned off.
2. A Map> is used to 
keep track of the offsets of messages successfully sent to target cluster. The 
map is updated on new producer's callback.
3. ZookeeperConsumerConnector has been modified to take a rebalance listener 
and take external offset map to commit offset.
4. Mirror maker wires in a consumer rebalance listener to avoid duplicates on 
consumer rebalance. On consumer rebalance, before releasing the partition 
ownership, it will first clean the messages in data channel, wait until the 
messages taken by producer receives callback, commit offset and continue the 
rebalance.
5. An offset commit thread will be responsible for committing offset 
periodically.



was (Author: becket_qin):
[~junrao] Could you also help review the patch as Joel thought it is kind of a 
big patch of new design for mirror maker. 
Some major changes are listed below to expedite the review: 
1. ZookeeperConsumerConnector auto commit is turned off.
2. A Map> is used to 
keep track of the offsets of messages successfully sent to target cluster. The 
map is updated on new producer's callback.
3. ZookeeperConsumerConnector has been modified to take a rebalance listener 
and take external offset map to commit offset.
4. Mirror maker wires in a consumer rebalance listener to avoid duplicates on 
consumer rebalance. On consumer rebalance, before releasing the partition 
ownership, it will first clean the messages in data channel, wait until the 
messages taken by producer receives callback, commit offset and continue the 
rebalance.
5. An offset commit thread will be responsible for committing offset 
periodically.


> Mirror Maker could lose data on unclean shutdown.
> -
>
> Key: KAFKA-1650
> URL: https://issues.apache.org/jira/browse/KAFKA-1650
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, 
> KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, 
> KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, 
> KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, 
> KAFKA-1650_2014-12-04_11:59:07.patch
>
>
> Currently if mirror maker got shutdown uncleanly, the data in the data 
> channel and buffer could potentially be lost. With the new producer's 
> callback, this issue could be solved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-12-04 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234595#comment-14234595
 ] 

Bhavesh Mistry commented on KAFKA-1788:
---

We also need to fix the Producer Close which hangs JVM because io.join() thread 
does not exit.  Please refer to KAFKA-1642 for more details.  So Kakfa core Dev 
needs to give guidance on how to solve this problem.

Please see below comments from that linked issue.


1) Producer.close() method issue is not address with patch. In event of network 
connection lost or other events happens, IO thread will not be killed and close 
method hangs. In patch that I have provided, I had timeout for join method and 
interrupted IO thread. I think we need similar solution.

[~ewencp],

1. I'm specifically trying to address the CPU usage here. I realize from your 
perspective they are closely related since they're both can be triggered by a 
loss of network connectivity, but internally they're really separate issues – 
the CPU usage has to do with incorrect timeouts and the join() issues is due to 
the lack of timeouts on produce operations. That's why I pointed you toward 
KAFKA-1788. If a timeout is added for data in the producer, that would resolve 
the close issue as well since any data waiting in the producer would eventually 
timeout and the IO thread could exit. I think that's the cleanest solution 
since it solves both problems with a single setting (the amount of time your 
willing to wait before discarding data). If you think a separate timeout 
specifically for Producer.close() is worthwhile I'd suggest filing a separate 
JIRA for that.



> producer record can stay in RecordAccumulator forever if leader is no 
> available
> ---
>
> Key: KAFKA-1788
> URL: https://issues.apache.org/jira/browse/KAFKA-1788
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Jun Rao
>  Labels: newbie++
> Fix For: 0.8.3
>
>
> In the new producer, when a partition has no leader for a long time (e.g., 
> all replicas are down), the records for that partition will stay in the 
> RecordAccumulator until the leader is available. This may cause the 
> bufferpool to be full and the callback for the produced message to block for 
> a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-727) broker can still expose uncommitted data to a consumer

2014-12-04 Thread lokesh Birla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234678#comment-14234678
 ] 

lokesh Birla commented on KAFKA-727:


Hi,

Is this really fixed? I still see this issue when I am using 4 topics, 3 
partitions and 3 replication factor.  I am using kafka_2.9.2-0.8.1.1.
Currently I am using 3 node broker and 1 zookeeper. I did not see this issue 
when I used 1,2 or 3 topics. 



2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch request 
for partition [mmetopic4,2] offset 1940029 from consumer with correlation id 21 
(kafka.server.Kaf
kaApis)
java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
(1818353) less than the start offset (1940029).
at kafka.log.LogSegment.read(LogSegment.scala:136)
at kafka.log.Log.read(Log.scala:386)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map1.map(Map.scala:107)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
at 
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
at 
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
at 
kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
at java.lang.Thread.run(Thread.java:745)


THanks for your help. 


> broker can still expose uncommitted data to a consumer
> --
>
> Key: KAFKA-727
> URL: https://issues.apache.org/jira/browse/KAFKA-727
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.0
>Reporter: Jun Rao
>Assignee: Jay Kreps
>Priority: Blocker
>  Labels: p1
> Attachments: KAFKA-727-v1.patch
>
>
> Even after kafka-698 is fixed, we still see consumer clients occasionally see 
> uncommitted data. The following is how this can happen.
> 1. In Log.read(), we pass in startOffset < HW and maxOffset = HW.
> 2. Then we call LogSegment.read(), in which we call translateOffset on the 
> maxOffset. The offset doesn't exist and translateOffset returns null.
> 3. Continue in LogSegment.read(), we then call messageSet.sizeInBytes() to 
> fetch and return the data.
> What can happen is that between step 2 and step 3, a new message is appended 
> to the log and is not committed yet. Now, we have exposed uncommitted data to 
> the client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #348

2014-12-04 Thread Apache Jenkins Server
See 

--
[...truncated 827 lines...]
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39)

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.jav

[jira] [Created] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-04 Thread lokesh Birla (JIRA)
lokesh Birla created KAFKA-1806:
---

 Summary: broker can still expose uncommitted data to a consumer
 Key: KAFKA-1806
 URL: https://issues.apache.org/jira/browse/KAFKA-1806
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: lokesh Birla
Assignee: Neha Narkhede


Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
is marked fixed but I still see this issue in 0.8.1.1. I am able to reproducer 
the issue consistently. 

[2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
request for partition [mmetopic4,2] offset 1940029 from consumer with 
correlation id 21 (kafka.server.Kaf
kaApis)
java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
(1818353) less than the start offset (1940029).
at kafka.log.LogSegment.read(LogSegment.scala:136)
at kafka.log.Log.read(Log.scala:386)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.immutable.Map$Map1.map(Map.scala:107)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
at 
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
at 
kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
at 
kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
at java.lang.Thread.run(Thread.java:745)





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer

2014-12-04 Thread lokesh Birla (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234713#comment-14234713
 ] 

lokesh Birla commented on KAFKA-1806:
-

I have 3 node cluster kafka broker running one broker on each blade. I have one 
zookeeper running on another blade. 
I created 4 partitions with replication factor 3 each and producer is sending 
messages from one blade and consumer is getting from another blade. I see above 
issue consistently. 

However this issue did not exist when I use same configuaration with upto 3 
topics. I increase the heap size from 4GB to 16GB however same issue. 

> broker can still expose uncommitted data to a consumer
> --
>
> Key: KAFKA-1806
> URL: https://issues.apache.org/jira/browse/KAFKA-1806
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: lokesh Birla
>Assignee: Neha Narkhede
>
> Although following issue: https://issues.apache.org/jira/browse/KAFKA-727
> is marked fixed but I still see this issue in 0.8.1.1. I am able to 
> reproducer the issue consistently. 
> [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [mmetopic4,2] offset 1940029 from consumer with 
> correlation id 21 (kafka.server.Kaf
> kaApis)
> java.lang.IllegalArgumentException: Attempt to read with a maximum offset 
> (1818353) less than the start offset (1940029).
> at kafka.log.LogSegment.read(LogSegment.scala:136)
> at kafka.log.Log.read(Log.scala:386)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:119)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
> at scala.collection.immutable.Map$Map1.map(Map.scala:107)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> at 
> kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> at 
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1795) OOME - high level kafka consumer

2014-12-04 Thread Umesh Batra (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234738#comment-14234738
 ] 

Umesh Batra commented on KAFKA-1795:


Please ignore; I figured it out and leak was actually post consumer downstream 
component not related to kafka. 

> OOME - high level kafka consumer
> 
>
> Key: KAFKA-1795
> URL: https://issues.apache.org/jira/browse/KAFKA-1795
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1, 0.9.0
>Reporter: Umesh Batra
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: newbie
>
> I am using Kafka High Level Consumer, version kafka_2.8.0 - 0.8.1.1 with 
> zkclient - 0.3 and I get OOME just after 10/15 minutes, My volume test setup 
> has just one topic with 10 partitions with continuous message (size ~500KB) 
> flow and below are my configuration; 
>  
> zookeeper.connect=localhost:2181,localhost:2181
> group.id=tc
> consumer.id=tc
> zookeeper.sync.time.ms=200
> zookeeper.connection.timeout.ms=1
> zookeeper.session.timeout.ms=1
> fetch.size=50
> fetch.message.max.bytes=100
> auto.commit.enable=true
> auto.commit.interval.ms=100
> auto.offset.reset=largest
> queued.max.message.chunks=1
> backoff.increment.ms=1000
> rebalance.max.retries=10
> rebalance.retries.max=10
> rebalance.backoff.ms=1
> refresh.leader.backoff.ms=2
> consumer.timeout.ms=5 
>  
> Memory histogram shows 60% of memory consumed by byte[] and most of the 
> remaining by char[] and HashMap$Node. In my various tries to recover from the 
> situation. I observed "metric-*" thread live even after I shutdown Kafka 
> connector? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1795) OOME - high level kafka consumer

2014-12-04 Thread Umesh Batra (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Umesh Batra resolved KAFKA-1795.

Resolution: Not a Problem

The issue was with downstream post consumer component and not with Kafka. 

> OOME - high level kafka consumer
> 
>
> Key: KAFKA-1795
> URL: https://issues.apache.org/jira/browse/KAFKA-1795
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1, 0.9.0
>Reporter: Umesh Batra
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: newbie
>
> I am using Kafka High Level Consumer, version kafka_2.8.0 - 0.8.1.1 with 
> zkclient - 0.3 and I get OOME just after 10/15 minutes, My volume test setup 
> has just one topic with 10 partitions with continuous message (size ~500KB) 
> flow and below are my configuration; 
>  
> zookeeper.connect=localhost:2181,localhost:2181
> group.id=tc
> consumer.id=tc
> zookeeper.sync.time.ms=200
> zookeeper.connection.timeout.ms=1
> zookeeper.session.timeout.ms=1
> fetch.size=50
> fetch.message.max.bytes=100
> auto.commit.enable=true
> auto.commit.interval.ms=100
> auto.offset.reset=largest
> queued.max.message.chunks=1
> backoff.increment.ms=1000
> rebalance.max.retries=10
> rebalance.retries.max=10
> rebalance.backoff.ms=1
> refresh.leader.backoff.ms=2
> consumer.timeout.ms=5 
>  
> Memory histogram shows 60% of memory consumed by byte[] and most of the 
> remaining by char[] and HashMap$Node. In my various tries to recover from the 
> situation. I observed "metric-*" thread live even after I shutdown Kafka 
> connector? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-1795) OOME - high level kafka consumer

2014-12-04 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-1795.


> OOME - high level kafka consumer
> 
>
> Key: KAFKA-1795
> URL: https://issues.apache.org/jira/browse/KAFKA-1795
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1, 0.9.0
>Reporter: Umesh Batra
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: newbie
>
> I am using Kafka High Level Consumer, version kafka_2.8.0 - 0.8.1.1 with 
> zkclient - 0.3 and I get OOME just after 10/15 minutes, My volume test setup 
> has just one topic with 10 partitions with continuous message (size ~500KB) 
> flow and below are my configuration; 
>  
> zookeeper.connect=localhost:2181,localhost:2181
> group.id=tc
> consumer.id=tc
> zookeeper.sync.time.ms=200
> zookeeper.connection.timeout.ms=1
> zookeeper.session.timeout.ms=1
> fetch.size=50
> fetch.message.max.bytes=100
> auto.commit.enable=true
> auto.commit.interval.ms=100
> auto.offset.reset=largest
> queued.max.message.chunks=1
> backoff.increment.ms=1000
> rebalance.max.retries=10
> rebalance.retries.max=10
> rebalance.backoff.ms=1
> refresh.leader.backoff.ms=2
> consumer.timeout.ms=5 
>  
> Memory histogram shows 60% of memory consumed by byte[] and most of the 
> remaining by char[] and HashMap$Node. In my various tries to recover from the 
> situation. I observed "metric-*" thread live even after I shutdown Kafka 
> connector? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1476) Get a list of consumer groups

2014-12-04 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1476:
-
Assignee: (was: BalajiSeshadri)

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, 
> KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, 
> KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, 
> KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1476) Get a list of consumer groups

2014-12-04 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1476:
-
Assignee: Balaji Seshadri

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: Balaji Seshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, 
> KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, 
> KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, 
> KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-12-04 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234757#comment-14234757
 ] 

Neha Narkhede commented on KAFKA-1476:
--

[~techybalaji] KAFKA-1694 is designing the command line tools but it is ways 
away from being completed. You maybe able to get this done before that, is my 
guess.

> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: Balaji Seshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: ConsumerCommand.scala, KAFKA-1476-LIST-GROUPS.patch, 
> KAFKA-1476-RENAME.patch, KAFKA-1476-REVIEW-COMMENTS.patch, KAFKA-1476.patch, 
> KAFKA-1476.patch, KAFKA-1476.patch, KAFKA-1476_2014-11-10_11:58:26.patch, 
> KAFKA-1476_2014-11-10_12:04:01.patch, KAFKA-1476_2014-11-10_12:06:35.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-12-04 Thread Jay Kreps
I agree that having the new Producer(KeySerializer,
ValueSerializer) interface would be useful.

People suggested cases where you want to mix and match serialization types.
The ByteArraySerializer is a no-op that would give the current behavior so
any odd case where you need to mix and match serialization or opt out
entirely are totally possible and won't have any overhead other than the
syntactic burden of declaring the parametric type . However
the expectation is that these cases are rare.

I really really think we should avoid having a second producer interface
like KafkaSerializedProducer. KafkaProducer will give the
serialization free behavior. I think our experience has been that surface
area really matters with these things so let's not have two. That sounds
like a compromise but is actually the worst of all worlds since it
duplicates everything over a fairly minor matter.

-Jay



On Thu, Dec 4, 2014 at 10:33 AM, Jiangjie Qin 
wrote:

>
> I'm just thinking instead of binding serialization with producer, another
> option is to bind serializer/deserializer with
> ProducerRecord/ConsumerRecord (please see the detail proposal below.)
>The arguments for this option is:
> A. A single producer could send different message types. There are
> several use cases in LinkedIn for per record serializer
> - In Samza, there are some in-stream order-sensitive control
> messages
> having different deserializer from other messages.
> - There are use cases which need support for sending both Avro
> messages
> and raw bytes.
> - Some use cases needs to deserialize some Avro messages into
> generic
> record and some other messages into specific record.
> B. In current proposal, the serializer/deserilizer is instantiated
> according to config. Compared with that, binding serializer with
> ProducerRecord and ConsumerRecord is less error prone.
>
>
> This option includes the following changes:
> A. Add serializer and deserializer interfaces to replace serializer
> instance from config.
> Public interface Serializer  {
> public byte[] serializeKey(K key);
> public byte[] serializeValue(V value);
> }
> Public interface deserializer  {
> Public K deserializeKey(byte[] key);
> public V deserializeValue(byte[] value);
> }
>
> B. Make ProducerRecord and ConsumerRecord abstract class
> implementing
> Serializer  and Deserializer  respectively.
> Public abstract class ProducerRecord  implements
> Serializer 
> {...}
> Public abstract class ConsumerRecord  implements
> Deserializer  V> {...}
>
> C. Instead of instantiate the serializer/Deserializer from config,
> let
> concrete ProducerRecord/ConsumerRecord extends the abstract class and
> override the serialize/deserialize methods.
>
> Public class AvroProducerRecord extends ProducerRecord
>  GenericRecord> {
> ...
> @Override
> Public byte[] serializeKey(String key) {Š}
> @Override
> public byte[] serializeValue(GenericRecord value);
> }
>
> Public class AvroConsumerRecord extends ConsumerRecord
>  GenericRecord> {
> ...
> @Override
> Public K deserializeKey(byte[] key) {Š}
> @Override
> public V deserializeValue(byte[] value);
> }
>
> D. The producer API changes to
> Public class KafkaProducer {
> ...
>
> Future send (ProducerRecord 
> record) {
> ...
> K key = record.serializeKey(record.key);
> V value =
> record.serializedValue(record.value);
> BytesProducerRecord bytesProducerRecord =
> new
> BytesProducerRecord(topic, partition, key, value);
> ...
> }
> ...
> }
>
>
>
> We also had some brainstorm in LinkedIn and here are the feedbacks:
>
> If the community decide to add the serialization back to new producer,
> besides current proposal which changes new producer API to be a template,
> there are some other options raised during our discussion:
> 1) Rather than change current new producer API, we can provide a
> wrapper
> of current new producer (e.g. KafkaSerializedProducer) and make it
> available to users. As there is value in the simplicity of current API.
>
> 2) If we decide to go with tempalated new producer API, according
> to
> experience in LinkedIn, it might worth considering to instantiate 

[jira] [Commented] (KAFKA-1755) Improve error handling in log cleaner

2014-12-04 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234780#comment-14234780
 ] 

Guozhang Wang commented on KAFKA-1755:
--

Here are my two cents:

1. At the end of the day, Kafka will have two types of topics, one type only 
accepts keyed messages and log compaction is used; the other one accepts any 
message and log cleaning is used. Those two types of topics never exchange, 
i.e. once a topic is created with one of the two types, it will never change 
its type until deletion.

2. Compressed message will be supported with log compaction, which will 
de-serialize the message set and re-serialize.

3. With these two points in mind, I would suggest for now:
  a. Broker reject non-keyed messages for compacted topics.
  b. Broker reject compressed messages for compacted topics (this will be 
lifted after KAFKA-1374 is checked in).
  c. With this, it should never happen that compactor thread encountering a 
non-keyed / compressed (this will be lifted after KAFKA-1374); if it happens, 
this would be a FATAL error and we should throw an exception and halt the 
server. It indicates some operations are needed and there are some code fixes 
before it can be restarted.

> Improve error handling in log cleaner
> -
>
> Key: KAFKA-1755
> URL: https://issues.apache.org/jira/browse/KAFKA-1755
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Joel Koshy
>  Labels: newbie++
> Fix For: 0.8.3
>
>
> The log cleaner is a critical process when using compacted topics.
> However, if there is any error in any topic (notably if a key is missing) 
> then the cleaner exits and all other compacted topics will also be adversely 
> affected - i.e., compaction stops across the board.
> This can be improved by just aborting compaction for a topic on any error and 
> keep the thread from exiting.
> Another improvement would be to reject messages without keys that are sent to 
> compacted topics although this is not enough by itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1173) Using Vagrant to get up and running with Apache Kafka

2014-12-04 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234816#comment-14234816
 ] 

Joe Stein commented on KAFKA-1173:
--

[~ewencp] fell off my radar some will circle back on this next day or so and 
catch back up

> Using Vagrant to get up and running with Apache Kafka
> -
>
> Key: KAFKA-1173
> URL: https://issues.apache.org/jira/browse/KAFKA-1173
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Joe Stein
>Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.3
>
> Attachments: KAFKA-1173.patch, KAFKA-1173_2013-12-07_12:07:55.patch, 
> KAFKA-1173_2014-11-11_13:50:55.patch, KAFKA-1173_2014-11-12_11:32:09.patch, 
> KAFKA-1173_2014-11-18_16:01:33.patch
>
>
> Vagrant has been getting a lot of pickup in the tech communities.  I have 
> found it very useful for development and testing and working with a few 
> clients now using it to help virtualize their environments in repeatable ways.
> Using Vagrant to get up and running.
> For 0.8.0 I have a patch on github https://github.com/stealthly/kafka
> 1) Install Vagrant [http://www.vagrantup.com/](http://www.vagrantup.com/)
> 2) Install Virtual Box 
> [https://www.virtualbox.org/](https://www.virtualbox.org/)
> In the main kafka folder
> 1) ./sbt update
> 2) ./sbt package
> 3) ./sbt assembly-package-dependency
> 4) vagrant up
> once this is done 
> * Zookeeper will be running 192.168.50.5
> * Broker 1 on 192.168.50.10
> * Broker 2 on 192.168.50.20
> * Broker 3 on 192.168.50.30
> When you are all up and running you will be back at a command brompt.  
> If you want you can login to the machines using vagrant shh  but 
> you don't need to.
> You can access the brokers and zookeeper by their IP
> e.g.
> bin/kafka-console-producer.sh --broker-list 
> 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox
> bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox 
> --from-beginning



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1807) Improve accuracy of ProducerPerformance target throughput

2014-12-04 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-1807:


 Summary: Improve accuracy of ProducerPerformance target throughput
 Key: KAFKA-1807
 URL: https://issues.apache.org/jira/browse/KAFKA-1807
 Project: Kafka
  Issue Type: Improvement
  Components: clients, tools
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor
 Fix For: 0.8.3


The code in ProducerPerformance that tries to match a target throughput is very 
inaccurate because it doesn't account for time spent sending messages. Since we 
have to get the current time to timestamp the messages, we can be much more 
accurate by computing the current rate over the entire run and only add to the 
sleep deficit if we're above the target rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 28735: Patch for KAFKA-1807

2014-12-04 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28735/
---

Review request for kafka.


Bugs: KAFKA-1807
https://issues.apache.org/jira/browse/KAFKA-1807


Repository: kafka


Description
---

KAFKA-1807 Improve accuracy of ProducerPerformance target throughput.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
ac8615082e21c34417d966fb5594784d6a1eec6c 

Diff: https://reviews.apache.org/r/28735/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1807) Improve accuracy of ProducerPerformance target throughput

2014-12-04 Thread Lam Thien Son (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lam Thien Son updated KAFKA-1807:
-
Status: Patch Available  (was: Open)

> Improve accuracy of ProducerPerformance target throughput
> -
>
> Key: KAFKA-1807
> URL: https://issues.apache.org/jira/browse/KAFKA-1807
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Fix For: 0.8.3
>
>
> The code in ProducerPerformance that tries to match a target throughput is 
> very inaccurate because it doesn't account for time spent sending messages. 
> Since we have to get the current time to timestamp the messages, we can be 
> much more accurate by computing the current rate over the entire run and only 
> add to the sleep deficit if we're above the target rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1807) Improve accuracy of ProducerPerformance target throughput

2014-12-04 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14234940#comment-14234940
 ] 

Ewen Cheslack-Postava commented on KAFKA-1807:
--

Created reviewboard https://reviews.apache.org/r/28735/diff/
 against branch origin/trunk

> Improve accuracy of ProducerPerformance target throughput
> -
>
> Key: KAFKA-1807
> URL: https://issues.apache.org/jira/browse/KAFKA-1807
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-1807.patch
>
>
> The code in ProducerPerformance that tries to match a target throughput is 
> very inaccurate because it doesn't account for time spent sending messages. 
> Since we have to get the current time to timestamp the messages, we can be 
> much more accurate by computing the current rate over the entire run and only 
> add to the sleep deficit if we're above the target rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1807) Improve accuracy of ProducerPerformance target throughput

2014-12-04 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1807:
-
Attachment: KAFKA-1807.patch

> Improve accuracy of ProducerPerformance target throughput
> -
>
> Key: KAFKA-1807
> URL: https://issues.apache.org/jira/browse/KAFKA-1807
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, tools
>Affects Versions: 0.8.1.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-1807.patch
>
>
> The code in ProducerPerformance that tries to match a target throughput is 
> very inaccurate because it doesn't account for time spent sending messages. 
> Since we have to get the current time to timestamp the messages, we can be 
> much more accurate by computing the current rate over the entire run and only 
> add to the sleep deficit if we're above the target rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28735: Patch for KAFKA-1807

2014-12-04 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28735/#review63953
---

Ship it!


Ship It!

- Neha Narkhede


On Dec. 5, 2014, 2:21 a.m., Ewen Cheslack-Postava wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/28735/
> ---
> 
> (Updated Dec. 5, 2014, 2:21 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1807
> https://issues.apache.org/jira/browse/KAFKA-1807
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1807 Improve accuracy of ProducerPerformance target throughput.
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
> ac8615082e21c34417d966fb5594784d6a1eec6c 
> 
> Diff: https://reviews.apache.org/r/28735/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>