[DISCUSSION] Error Handling and Logging at Kafka

2014-08-26 Thread Guozhang Wang
Hello all,

We want to kick off some discussions about error handling and logging
conventions. With a number of great patch contributions to Kafka recently,
it is good time for us to sit down and think a little bit more about the
coding style guidelines we have (http://kafka.apache.org/coding-guide.html).

People at LinkedIn have discussed a bit about some observed issues about
the error handling and logging verboseness, and here is a summary of the
bullet points covered:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging

We would like to collect as much comments as possible before we move ahead
to those logging cleaning JIRAs mentioned in the wiki, so please feel free
to shoot any thoughts and suggestions around this topic.

-- Guozhang


Re: Review Request 24676: Fix KAFKA-1583

2014-08-27 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Aug. 27, 2014, 4:44 p.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

TBD


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-08-27 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Aug. 27, 2014, 5 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incorporated Jun's comments.

1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be 
used for re-reading the log.
3. I kept the name of callbackOnComplete by following the principle that only 
the caller knows what the callback is used for, and hence they can name the 
callback as reponseCallback (from KafkaApi) and putCacheCallback (from 
OffsetManager), all the callee will take the callback as callbackOnComplete.

Unit test passed, with some other notes:

1. Found and fix a bug in the current delayed fetch satisifaction logic: 
previously when we calculate the bytes, we do not take in the fetchMaxBytes 
into consideration as an upper limit for a single partition's log, but simply 
get the diff between the current HW/LEO and the fetch offset.
2. Fount and fix a bug in the unit tests: we used to create replica manager on 
the fly but did not shut it down upon completing the test, which will leak the 
background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two 
implemented functions: forceComplete() and isCompleted(), and two functions 
that need to be instantiated in the subclasses: tryComplete() and complete(). 
Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it 
actually just test a simple logic of the replica manager.

One concern I have now is about the online creation of a new callback function 
(i.e. the def inside the handling functions and offset manager's storeOffset 
function, when I am running the unit test with the patch it seems causing a 
higher CPU consumption than trunk). And could some one take a another pair of 
eyes in running the unit tests and check the CPU performance?


Diffs
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing (updated)
---

Unit tests


Thanks,

Guozhang Wang



Re: [DISCUSSION] Error Handling and Logging at Kafka

2014-08-27 Thread Guozhang Wang
Thanks Joe / Gwen for the great input. I have commented on the page
accordingly also.

There is a ticket for the background connection issue: KAFKA-1592
https://issues.apache.org/jira/browse/KAFKA-1592.


On Wed, Aug 27, 2014 at 10:25 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 The wiki is great place to record the results or make suggestions that
 require formatting and graphics to explain, but I think I prefer
 mailing lists for discussion.

 So, let me repeat here a question I raised in the wiki, since I don't
 want it to get lost :)

 Is there a JIRA for the issues mentioned in the first background
 example? (closing socket and connection unsuccessful). I'll be more
 than happy to pick those up as they've made my life pretty painful.

 Gwen

 On Tue, Aug 26, 2014 at 7:21 PM, Joe Stein joe.st...@stealth.ly wrote:
  Hi Guozhang thanks for kicking this off.  I made some comments in the
 Wiki
  (and we can continue the discussion there) but think this type of
  collaborative mailing list discussion and confluence writeup is a great
 way
  for different discussions about the same thing in different organizations
  to coalesce together in code.
 
  This may also be a good place for folks looking to get their feet wet
  contributing code to-do so.
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
 
  On Tue, Aug 26, 2014 at 6:22 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Hello all,
 
  We want to kick off some discussions about error handling and logging
  conventions. With a number of great patch contributions to Kafka
 recently,
  it is good time for us to sit down and think a little bit more about the
  coding style guidelines we have (
 http://kafka.apache.org/coding-guide.html
  ).
 
  People at LinkedIn have discussed a bit about some observed issues about
  the error handling and logging verboseness, and here is a summary of the
  bullet points covered:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging
 
  We would like to collect as much comments as possible before we move
 ahead
  to those logging cleaning JIRAs mentioned in the wiki, so please feel
 free
  to shoot any thoughts and suggestions around this topic.
 
  -- Guozhang
 




-- 
-- Guozhang


Review Request 25155: Fix KAFKA-1616

2014-08-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/
---

Review request for kafka.


Bugs: KAFKA-1616
https://issues.apache.org/jira/browse/KAFKA-1616


Repository: kafka


Description
---

Purgatory size to be the sum of watched list sizes; delayed request to be the 
expiry queue length; remove atomic integers for metrics


Diffs
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 

Diff: https://reviews.apache.org/r/25155/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 25155: Fix KAFKA-1616

2014-08-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/
---

(Updated Aug. 28, 2014, 5:12 p.m.)


Review request for kafka.


Bugs: KAFKA-1616
https://issues.apache.org/jira/browse/KAFKA-1616


Repository: kafka


Description
---

Purgatory size to be the sum of watched list sizes; delayed request to be the 
expiry queue length; remove atomic integers for metrics


Diffs (updated)
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 

Diff: https://reviews.apache.org/r/25155/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 25136: KAFKA-1610-Review Request

2014-08-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review51793
---


Thanks for the patch, some general comments: 

1. In general we would like to avoid using ._1 and ._2 simply due to clarity of 
the code; instead we can use { case (key, value) = }.
2. After thinking about it twice, I think even if the resulted collection is 
passed to some function as parameters, as long as we know that function will 
only read that value (for example ZkUtils.updatePartitionReassignmentData), but 
have no intention to modify it we can probably still use mapValues, which gives 
you the benefit of not creating one more Java collection in JVM. What do you 
think?
3. For places we do need to use map instead of mapValues (for example in 
ReplicaManager when we created the delayed request's reponse status). Add 
comments explaning why we do so (for the above example since acksPending and 
errorCode may be modified after the collection is created).

Some detailed comments below.


core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
https://reviews.apache.org/r/25136/#comment90400

Is this necessary? ReassignedPartitionContext seems not used anywhere.



core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
https://reviews.apache.org/r/25136/#comment90401

Is this intentional?



core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
https://reviews.apache.org/r/25136/#comment90402

Is this intentional?



core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
https://reviews.apache.org/r/25136/#comment90404

Could we use a new line for the nested map?



core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
https://reviews.apache.org/r/25136/#comment90406

For resulted maps that are used in the constructor parameters, as long as 
the constructor parameter will not change we can use mapValues.


- Guozhang Wang


On Aug. 28, 2014, 2:28 a.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Aug. 28, 2014, 2:28 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for replacing mapValues by map wherever necessary so that local 
 modifications to collections are not lost
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/log/LogManager.scala 
 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: Review Request 25044: KAFKA-1611 - Improve system test configuration

2014-08-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25044/#review51803
---

Ship it!


Thanks for the patch. LGTM.

- Guozhang Wang


On Aug. 26, 2014, 12:04 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25044/
 ---
 
 (Updated Aug. 26, 2014, 12:04 a.m.)
 
 
 Review request for kafka.
 
 
 Repository: kafka
 
 
 Description
 ---
 
 make the config a bit more out of the box for the common case of local 
 cluster. This includes:
 1. Fix cluster_config.json of migration testsuite, it has hardcoded path that 
 prevents it from working out of the box at all.
 2. Use JAVA_HOME environment variable if default is specified and if 
 JAVA_HOME is defined. The current guessing method is a bit broken and using 
 JAVA_HOME will allow devs to configure their default java dir without editing 
 multiple cluster_config.json files in multiple places.
 
 
 Diffs
 -
 
   system_test/migration_tool_testsuite/cluster_config.json 8353e56 
   system_test/utils/system_test_utils.py 50340f0 
 
 Diff: https://reviews.apache.org/r/25044/diff/
 
 
 Testing
 ---
 
 Running system tests bunch of times :)
 
 
 Thanks,
 
 Gwen Shapira
 




Re: Review Request 25136: KAFKA-1610-Review Request

2014-08-28 Thread Guozhang Wang


 On Aug. 28, 2014, 5:36 p.m., Guozhang Wang wrote:
  Thanks for the patch, some general comments: 
  
  1. In general we would like to avoid using ._1 and ._2 simply due to 
  clarity of the code; instead we can use { case (key, value) = }.
  2. After thinking about it twice, I think even if the resulted collection 
  is passed to some function as parameters, as long as we know that function 
  will only read that value (for example 
  ZkUtils.updatePartitionReassignmentData), but have no intention to modify 
  it we can probably still use mapValues, which gives you the benefit of not 
  creating one more Java collection in JVM. What do you think?
  3. For places we do need to use map instead of mapValues (for example in 
  ReplicaManager when we created the delayed request's reponse status). Add 
  comments explaning why we do so (for the above example since acksPending 
  and errorCode may be modified after the collection is created).
  
  Some detailed comments below.
 
 Mayuresh Gharat wrote:
 I agree with point 1.)
 Regarding point 2.) I think that even if the function is only reading 
 what happens when the collection gets changed and that function reads a 
 different value. Of course if the collection is created locally and passed to 
 a function then its better to use mapValues.
 Regarding point 3.) I will add those comments

2) Yeah I agree. If the collection is used outside the current block then we 
should use map.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review51793
---


On Aug. 28, 2014, 6:12 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Aug. 28, 2014, 6:12 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for replacing mapValues by map wherever necessary so that local 
 modifications to collections are not lost
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/log/LogManager.scala 
 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 Ran the unit tests and everything passed and the build succeeeded
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: Review Request 25136: Patch for KAFKA-1610

2014-08-29 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review51909
---


LGTM. One minor thing about comments is that we do not need to say Changing 
mapValues to map since we do not need to leave comments indicating code 
change, but just comment on the purpose of coding. We can generally say sth. 
like Create a new collection with map since it (maybe modified outside this 
block / in another function call, etc).

- Guozhang Wang


On Aug. 29, 2014, 5:04 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Aug. 29, 2014, 5:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Added comments explaining the changes and reverted back some changes as per 
 comments on the reviewboard
 
 
 Removed the unnecessary import
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 Ran the unit tests and everything passed and the build succeeeded
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: Review Request 25155: Fix KAFKA-1616

2014-09-01 Thread Guozhang Wang


 On Aug. 28, 2014, 11:44 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/RequestPurgatory.scala, line 258
  https://reviews.apache.org/r/25155/diff/2/?file=671427#file671427line258
 
  I am wonder if we should do two separate tests: (1) if enqueued() = 
  purgeInterval, we purge the dealyed queue, (2) if size = purgeInterval, we 
  purge the watchers.

Added one test in RequestPurgatoryTest, which tests the number of watched 
elements and the enqueued requests before and after the purging.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/#review51850
---


On Aug. 28, 2014, 5:12 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25155/
 ---
 
 (Updated Aug. 28, 2014, 5:12 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1616
 https://issues.apache.org/jira/browse/KAFKA-1616
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Purgatory size to be the sum of watched list sizes; delayed request to be the 
 expiry queue length; remove atomic integers for metrics
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 ce06d2c381348deef8559374869fcaed923da1d1 
 
 Diff: https://reviews.apache.org/r/25155/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-09-01 Thread Guozhang Wang


 On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, line 725
  https://reviews.apache.org/r/24676/diff/3-4/?file=666252#file666252line725
 
  Do we need to add the new parameter? Does it hurt to write the 
  checkpoint file in unit tests?

The reason is that in some unit tests with Mock, checkoutpoint function will 
try to access Log.dir(), which is not easy to be mocked.


 On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala, lines 171-172
  https://reviews.apache.org/r/24676/diff/3-4/?file=666256#file666256line171
 
  What it be better to return the correct offset and just return empty 
  MessageSet? The equality test can be on the offset.

The LogOffsetMetadata is only for the fetching start offset, which would be 0 
for both cases; we need to make sure that the fetched data's end offset does 
not exceed the limit.


 On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 169
  https://reviews.apache.org/r/24676/diff/4/?file=670284#file670284line169
 
  Yes, I think this should be debug level logging.

I will fix on other places for handling requests.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51855
---


On Aug. 27, 2014, 5 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Aug. 27, 2014, 5 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments.
 
 1. I left some cases in Log since they are return values for some of their 
 APIs.
 2. I kept the fetch info in the delayed fetch metadata since it needs to be 
 used for re-reading the log.
 3. I kept the name of callbackOnComplete by following the principle that 
 only the caller knows what the callback is used for, and hence they can name 
 the callback as reponseCallback (from KafkaApi) and putCacheCallback (from 
 OffsetManager), all the callee will take the callback as callbackOnComplete.
 
 Unit test passed, with some other notes:
 
 1. Found and fix a bug in the current delayed fetch satisifaction logic: 
 previously when we calculate the bytes, we do not take in the fetchMaxBytes 
 into consideration as an upper limit for a single partition's log, but simply 
 get the diff between the current HW/LEO and the fetch offset.
 2. Fount and fix a bug in the unit tests: we used to create replica manager 
 on the fly but did not shut it down upon completing the test, which will leak 
 the background thread (i.e. reaper thread of purgatory).
 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two 
 implemented functions: forceComplete() and isCompleted(), and two functions 
 that need to be instantiated in the subclasses: tryComplete() and complete(). 
 Please let me know if people have more comments on the current API.
 4. Cleaned the SimpleFetch test, previously this test is too complicate but 
 it actually just test a simple logic of the replica manager.
 
 One concern I have now is about the online creation of a new callback 
 function (i.e. the def inside the handling functions and offset manager's 
 storeOffset function, when I am running the unit test with the patch it seems 
 causing a higher CPU consumption than trunk). And could some one take a 
 another pair of eyes in running the unit tests and check the CPU performance?
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 51cdccf7f90eb530cc62b094ed822b8469d50b12 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 af9308737bf7832eca018c2b3ede703f7d1209f1 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 ff106b47e6ee194cea1cf589474fef975b9dd7e2 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 3fae7910e4ce17bc8325887a046f383e0c151d44 
   core/src/main/scala/kafka/log/Log.scala 
 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka

Re: Review Request 24676: Fix KAFKA-1583

2014-09-01 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Sept. 2, 2014, 1:07 a.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

TBD


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-09-01 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Sept. 2, 2014, 1:09 a.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incorporated Jun's comments round two.

Incorporated Jun's comments.

1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be 
used for re-reading the log.
3. I kept the name of callbackOnComplete by following the principle that only 
the caller knows what the callback is used for, and hence they can name the 
callback as reponseCallback (from KafkaApi) and putCacheCallback (from 
OffsetManager), all the callee will take the callback as callbackOnComplete.

Unit test passed, with some other notes:

1. Found and fix a bug in the current delayed fetch satisifaction logic: 
previously when we calculate the bytes, we do not take in the fetchMaxBytes 
into consideration as an upper limit for a single partition's log, but simply 
get the diff between the current HW/LEO and the fetch offset.
2. Found and fix a bug in the unit tests: we used to create replica manager on 
the fly but did not shut it down upon completing the test, which will leak the 
background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two 
implemented functions: forceComplete() and isCompleted(), and two functions 
that need to be instantiated in the subclasses: tryComplete() and complete(). 
Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it 
actually just test a simple logic of the replica manager.


Diffs
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: Review Request 25155: Fix KAFKA-1616

2014-09-02 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/
---

(Updated Sept. 2, 2014, 8:22 p.m.)


Review request for kafka.


Bugs: KAFKA-1616
https://issues.apache.org/jira/browse/KAFKA-1616


Repository: kafka


Description
---

Purgatory size to be the sum of watched list sizes; delayed request to be the 
expiry queue length; remove atomic integers for metrics; add a unit test for 
watched list sizes and enqueued requests


Diffs (updated)
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 

Diff: https://reviews.apache.org/r/25155/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-09-02 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Sept. 2, 2014, 8:37 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incorporated Jun's comments round three


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: Offset Request with timestamp

2014-09-02 Thread Guozhang Wang
The semantic of the offset API is to return the latest possible offset of
the message that is appended no later than the given timestamp. For
implementation, it will get the starting offset of the log segment that is
created no later than the given timestamp, and hence if your log segment
contains data for a long period of time, then the offset API may return you
just the starting offset of the current log segment.

If your traffic is small and you still want a finer grained offset
response, you can try to reduce the log segment size (default to 1 GB);
however doing so will increase the number of file handlers with more
frequent log segment rolling.

Guozhang


On Tue, Sep 2, 2014 at 10:21 AM, Manjunath Shivakumar 
manjunath.shivaku...@betfair.com wrote:

 Hi,

 My usecase is to fetch the offsets for a given topic from X milliseconds
 ago.
 If I use the offset api


 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI

 to do this and pass in a timestamp of (now() - X), I get the earliest
 offset in the current log segment and not the offset from X milliseconds
 ago.

 Is this the correct usage or behaviour?

 Thanks,
 Manju

 
 In order to protect our email recipients, Betfair Group use SkyScan from
 MessageLabs to scan all Incoming and Outgoing mail for viruses.

 




-- 
-- Guozhang


Re: Review Request 24676: Fix KAFKA-1583

2014-09-02 Thread Guozhang Wang


 On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
  Are you including the changes in kafka-1616 too? That would be fine. 
  However, the comments in the other jira also need to be addressed in this 
  patch.

I was not intending to include the changes of KAFKA-1616. The plan is to first 
check int K-1616, then rebase K-1583 on that. However some of the changes may 
refect some review comments in K-1616 just for ease of rebasing. I can revert 
these back if you want.


 On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, line 282
  https://reviews.apache.org/r/24676/diff/5/?file=673601#file673601line282
 
  may now in = may not be in

I think it should be may now be in?


 On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 343-349
  https://reviews.apache.org/r/24676/diff/5/?file=673612#file673612line343
 
  Not sure if we need the while loop any more. The comments may also need 
  to be adjusted.

Good point. For case curr != null  cur.forceComplete() == false we can just 
return as well.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52044
---


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Sept. 2, 2014, 8:37 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments round three
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 51cdccf7f90eb530cc62b094ed822b8469d50b12 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 af9308737bf7832eca018c2b3ede703f7d1209f1 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 ff106b47e6ee194cea1cf589474fef975b9dd7e2 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 3fae7910e4ce17bc8325887a046f383e0c151d44 
   core/src/main/scala/kafka/log/Log.scala 
 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 68758e35d496a4659819960ae8e809d6e215568e 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 ce06d2c381348deef8559374869fcaed923da1d1 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 168712de241125982d556c188c76514fceb93779 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 25155: Fix KAFKA-1616

2014-09-03 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/
---

(Updated Sept. 3, 2014, 7:52 p.m.)


Review request for kafka.


Bugs: KAFKA-1616
https://issues.apache.org/jira/browse/KAFKA-1616


Repository: kafka


Description (updated)
---

Incorporated Jun's comments round four


Diffs (updated)
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 

Diff: https://reviews.apache.org/r/25155/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 25136: Patch for KAFKA-1610

2014-09-03 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review52234
---

Ship it!


Ship It!

- Guozhang Wang


On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Sept. 3, 2014, 6:27 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Added comments explaining the changes and reverted back some changes as per 
 comments on the reviewboard
 
 
 Removed the unnecessary import
 
 
 Made changes to comments as per the suggestions on the reviewboard
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 Ran the unit tests and everything passed and the build succeeeded
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: Review Request 24676: Fix KAFKA-1583

2014-09-03 Thread Guozhang Wang


 On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/DelayedFetch.scala, line 100
  https://reviews.apache.org/r/24676/diff/6/?file=674148#file674148line100
 
  Should we log topicAndPartition as well?

fetchMetadata includes the fetchPartitionStatus, that includes the mapping of 
topicAndPartition to fetchPartitionStatus.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52092
---


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Sept. 2, 2014, 8:37 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments round three
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 51cdccf7f90eb530cc62b094ed822b8469d50b12 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 af9308737bf7832eca018c2b3ede703f7d1209f1 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 ff106b47e6ee194cea1cf589474fef975b9dd7e2 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 3fae7910e4ce17bc8325887a046f383e0c151d44 
   core/src/main/scala/kafka/log/Log.scala 
 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 68758e35d496a4659819960ae8e809d6e215568e 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 ce06d2c381348deef8559374869fcaed923da1d1 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 168712de241125982d556c188c76514fceb93779 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-09-03 Thread Guozhang Wang


 On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote:
  Looks good. I only have the following minor comments.

Thanks Jun. If there is no more comments for now I will wait for KAFKA-1616 to 
be checked in first, and then do the rebase and the class / function renaming 
(which will make the diff file quite hard to review) as well as comments 
modification accordingly.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52092
---


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Sept. 2, 2014, 8:37 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments round three
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 51cdccf7f90eb530cc62b094ed822b8469d50b12 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 af9308737bf7832eca018c2b3ede703f7d1209f1 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 ff106b47e6ee194cea1cf589474fef975b9dd7e2 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 3fae7910e4ce17bc8325887a046f383e0c151d44 
   core/src/main/scala/kafka/log/Log.scala 
 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 68758e35d496a4659819960ae8e809d6e215568e 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 ce06d2c381348deef8559374869fcaed923da1d1 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 168712de241125982d556c188c76514fceb93779 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: [DISCUSS] 0.8.2 release branch, unofficial release candidates(s), 0.8.1.2 release

2014-09-04 Thread Guozhang Wang
Just made a pass over the unresolved tickets tagged for 0.8.2, I think many
of them can be pushed to 0.8.3 / 0.9.


On Wed, Sep 3, 2014 at 8:05 PM, Jonathan Weeks jonathanbwe...@gmail.com
wrote:


 +1 on a 0.8.1.2 release as described.

 I manually applied patches to cobble together a working gradle build for
 kafka for scala 2.11, but would really appreciate an official release —
 i.e. 0.8.1.2, as we also have other dependent libraries we use as well
 (e.g. akka-kafka) that would be much easier to migrate and support if the
 build was public and official.

 There were at least several others on the “users” list that expressed
 interest in scala 2.11 support, who knows how many more “lurkers” are out
 there.

 Best Regards,

 -Jonathan

  Hey, I wanted to take a quick pulse to see if we are getting closer to a
  branch for 0.8.2.
 
  1) There still seems to be a lot of open issues
 
 https://issues.apache.org/jira/browse/KAFKA/fixforversion/12326167/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-issues-panel
  and our 30 day summary is showing issues: 51 created and *34* resolved
 and not
  sure how much of that we could really just decide to push off to 0.8.3 or
  0.9.0 vs working on 0.8.2 as stable for release.  There is already so
 much
  goodness on trunk.  I appreciate the double commit pain especially as
 trunk
  and branch drift (ugh).
 
  2) Also, I wanted to float the idea of after making the 0.8.2 branch
 that I
  would do some unofficial release candidates for folks to test prior to
  release and vote.  What I was thinking was I would build, upload and
 stage
  like I was preparing artifacts for vote but let the community know to go
 in
  and have at it well prior to the vote release.  We don't get a lot of
  community votes during a release but issues after (which is natural
 because
  of how things are done).  I have seen four Apache projects doing this
 very
  successfully not only have they had less iterations of RC votes
 (sensitive
  to that myself) but the community kicked back issues they saw by giving
  them some pre release time to go through their own test and staging
  environments as the release are coming about.
 
  3) Checking again on should we have a 0.8.1.2 release if folks in the
  community find important features (this might be best asked on the user
  list maybe not sure) they don't want/can't wait for which wouldn't be too
  much pain/dangerous to back port. Two things that spring to the top of my
  head are 2.11 Scala support and fixing the source jars.  Both of these
 are
  easy to patch personally I don't mind but want to gauge more from the
  community on this too.  I have heard gripes ad hoc from folks in direct
  communication but no complains really in the public forum and wanted to
  open the floor if folks had a need.
 
  4) 0.9 work I feel is being held up some (or at least resourcing it from
 my
  perspective).  We decided to hold up including SSL (even though we have a
  path for it). Jay did a nice update recently to the Security wiki which I
  think we should move forward with.  I have some more to add/change/update
  and want to start getting down to more details and getting specific
 people
  working on specific tasks but without knowing what we are doing when it
 is
  hard to manage.
 
  5) I just updated https://issues.apache.org/jira/browse/KAFKA-1555 I
 think
  it is a really important feature update doesn't have to be in 0.8.2 but
 we
  need consensus (no pun intended). It fundamentally allows for data in min
  two rack requirement which A LOT of data requires for successful save to
  occur.
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /




-- 
-- Guozhang


Re: Review Request 25155: Fix KAFKA-1616

2014-09-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25155/
---

(Updated Sept. 4, 2014, 8:26 p.m.)


Review request for kafka.


Bugs: KAFKA-1616
https://issues.apache.org/jira/browse/KAFKA-1616


Repository: kafka


Description (updated)
---

Purgatory size to be the sum of watched list sizes; delayed request to be the 
expiry queue length; remove atomic integers for metrics; add a unit test for 
watched list sizes and enqueued requests


Diffs (updated)
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
168712de241125982d556c188c76514fceb93779 

Diff: https://reviews.apache.org/r/25155/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-09-05 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Sept. 5, 2014, 9:08 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

rebase on KAFKA-1616 for checking diff files, please do not review


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala 
af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 
0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
cf3ed4c8f197d1197658645ccb55df0bce86bdd4 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: Review Request 25136: Patch for KAFKA-1610

2014-09-09 Thread Guozhang Wang


 On Sept. 9, 2014, 1:38 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 125
  https://reviews.apache.org/r/25136/diff/4/?file=675768#file675768line125
 
  I don't think this is required, since we are subsequently doing a map 
  over the view which materializes the correct final collection. So it seems 
  this map creates an unnecessary intermediate collection.

Actually, we are passing the view into another function 
checkIfPartitionReassignmentSucceeded inside the subsequent map, and hence we 
cannot really controll how this function will be using this view.


On Sept. 9, 2014, 1:38 a.m., Mayuresh Gharat wrote:
  I only reviewed up to DelayedFetch, because I think these changes are 
  mostly unnecessary and also add overhead. Can you re-evaluate the above and 
  the rest of the patch to see if there are any _correctness_ issues?
  
  I agree with the premise of this jira - i.e., we should fix places where we 
  use mapValues *and* cannot have a non-strict (i.e., lazily materialized) 
  view. However, most of the existing uses of mapValues are as far as I can 
  see legitimate uses of views where we don't need to materialize 
  intermediate collections.

Joel, I see your point. I think we should discuss a little bit on the principle 
of using map v.s. mapView; I was originally thinking for any views that are 
passing to another function or used outside of the current code block, they 
should be converted into an intermediate collection since it is out of 
controll. We can discuss over this convention.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review52660
---


On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Sept. 3, 2014, 6:27 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Added comments explaining the changes and reverted back some changes as per 
 comments on the reviewboard
 
 
 Removed the unnecessary import
 
 
 Made changes to comments as per the suggestions on the reviewboard
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 Ran the unit tests and everything passed and the build succeeeded
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: Error handling in new Java Producer

2014-09-09 Thread Guozhang Wang
Hi Steve:

1. the new producer will be included in the 0.8.2 release for production
usage.
2. The error you reported has been changed as a WARN in the latest trunk.
We realize this should not really be an error case. In general, any errors
will be propagated to the producer in the following two ways:

a. producer.send() can throw ApiException and runtime exceptions,
indicating the sending is not triggered in the underlying sending thread.
b. producer.send().get() can throw KafkaException such as leader not find,
indicating the sending response from the broker says it has failed
receiving the message.

Guozhang


On Mon, Sep 8, 2014 at 8:16 AM, Tarzia star...@signal.co wrote:

 Hello,

 I am trying to use the new org.apache.kafka.clients.producer.KafkaProducer
 to take advantage of error reporting that is lacking in the current
 stable Scala client (import kafka.javaapi.producer.Producer).  Two
 questions:

 * I know that 0.8.2 is not yet released but is the new Producer
 feature-complete and ready for testing?

 * If so, how should I check for errors in KafkaProducer#send()?  In my
 tests I brought down the Kafka sever and hoped to detect errors in the
 producer so that I could respond by re-queueing failed requests.  However,
 I was not getting any exceptions on KafkaProducer#send(), instead I got an
 exception inside the producer Thread:

 WARN org.apache.kafka.common.network.Selector - Error in I/O with
 localhost.localdomain/127.0.0.1
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:232)
 at
 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)

 Should this be bubbling up the the send() method, or should there be a
 getError() method in the RecordMetadata that is asynchronously returned?
 Basically, I don't understand the error-reporting API.

 Thanks,
 Steve Tarzia




-- 
-- Guozhang


Re: Error handling in new Java Producer

2014-09-09 Thread Guozhang Wang
Hi Steve,

You can try to reduce the metadata.fetch.timeout.ms config value, which
will controll how much the send() call can be blocked upon broker metadata
not available (due to broker itself not available).

Guozhang

On Tue, Sep 9, 2014 at 7:00 PM, Tarzia star...@signal.co wrote:

 Thanks Guozhang,

 Another case I am seeing is that producer.send() seems to block when the
 brokers are unavailable.  This is not the behavior I want (I would rather
 have it throw an exception immediately so I can queue the messages for
 replay).  I will try to confirm this tomorrow.

 -Steve


 On Sep 9, 2014, at 8:55 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Steve:
 
  1. the new producer will be included in the 0.8.2 release for production
  usage.
  2. The error you reported has been changed as a WARN in the latest trunk.
  We realize this should not really be an error case. In general, any
 errors
  will be propagated to the producer in the following two ways:
 
  a. producer.send() can throw ApiException and runtime exceptions,
  indicating the sending is not triggered in the underlying sending thread.
  b. producer.send().get() can throw KafkaException such as leader not
 find,
  indicating the sending response from the broker says it has failed
  receiving the message.
 
  Guozhang
 
 
  On Mon, Sep 8, 2014 at 8:16 AM, Tarzia star...@signal.co wrote:
 
  Hello,
 
  I am trying to use the new
 org.apache.kafka.clients.producer.KafkaProducer
  to take advantage of error reporting that is lacking in the current
  stable Scala client (import kafka.javaapi.producer.Producer).  Two
  questions:
 
  * I know that 0.8.2 is not yet released but is the new Producer
  feature-complete and ready for testing?
 
  * If so, how should I check for errors in KafkaProducer#send()?  In my
  tests I brought down the Kafka sever and hoped to detect errors in the
  producer so that I could respond by re-queueing failed requests.
 However,
  I was not getting any exceptions on KafkaProducer#send(), instead I got
 an
  exception inside the producer Thread:
 
  WARN org.apache.kafka.common.network.Selector - Error in I/O with
  localhost.localdomain/127.0.0.1
  java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
  sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
 at
 org.apache.kafka.common.network.Selector.poll(Selector.java:232)
 at
  org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
 at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 
  Should this be bubbling up the the send() method, or should there be a
  getError() method in the RecordMetadata that is asynchronously returned?
  Basically, I don't understand the error-reporting API.
 
  Thanks,
  Steve Tarzia
 
 
 
 
  --
  -- Guozhang




-- 
-- Guozhang


Re: Review Request 25420: Patch for KAFKA-686

2014-09-10 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25420/#review52980
---


Thanks for the patch. Overall it is a great clean-up. Some comments below:


core/src/main/scala/kafka/controller/PartitionStateMachine.scala
https://reviews.apache.org/r/25420/#comment92277

The controller will only be shutdown upon kafka server shutting down, we 
should probably just call controller.onControllerResignation()



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/25420/#comment92282

Do we still need this function any more? Shall we just change the usages of 
getReplicasForPartition to getPartitionAssignmentForTopic?



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/25420/#comment92285

Why do we want it to be mutable?



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/25420/#comment92286

Shall we also use failAsValue on catching ZkNoNodeException in 
getChildrenParentMayNotExist, and make its return type an Option?



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/25420/#comment92288

Can we rename to consumerOffsetForPartitionDir for naming consistency?


- Guozhang Wang


On Sept. 7, 2014, 7:22 p.m., Viktor Tarananeko wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25420/
 ---
 
 (Updated Sept. 7, 2014, 7:22 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-686
 https://issues.apache.org/jira/browse/KAFKA-686
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Merge branch 'trunk' into fix-null-pointer-in-zk-utils
 
 
 unify topic partition path constructing; refactoring and better code reuse
 
 
 reuse method to fetch broker data
 
 
 unify fetching topics
 
 
 raise InvalidTopicException if unable to properly parse json data from 
 Zookeeper
 
 
 TopicRegistrationInfo class
 
 
 base controller failure test on invalid topic data
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/TopicRegistrationInfo.scala PRE-CREATION 
   core/src/main/scala/kafka/consumer/TopicCount.scala 
 0954b3c3ff8b3b7a7a4095436bc9e6c494a38c37 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
 e20b63a6ec1c1a848bc3823701b0f8ceaeb6100d 
   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
 4acdd70fe9c1ee78d6510741006c2ece65450671 
   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
 d1e7c434e77859d746b8dc68dd5d5a3740425e79 
   core/src/main/scala/kafka/tools/ExportZkOffsets.scala 
 4d051bc2db12f0e15aa6a3289abeb9dd25d373d2 
   core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
 111c9a8b94ce45d95551482e9fd3f8c1cccbf548 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 a7b1fdcb50d5cf930352d37e39cb4fc9a080cb12 
   core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 
 95303e098d40cd790fb370e9b5a47d20860a6da3 
   core/src/test/scala/unit/kafka/server/ControllerFailureTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 c4e13c5240c8303853d08cc3b40088f8c7dae460 
 
 Diff: https://reviews.apache.org/r/25420/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Viktor Tarananeko
 




Re: Review Request 25136: Patch for KAFKA-1610

2014-09-15 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25136/#review53391
---


Have talked with Mayuresh and Joel about which mapValues should be converted to 
map. The summary is that

1. For now the only place we do in-place modification of a map is in 
ProducePartitionStatus, on isAcksPending and ErrorCode; however it is not 
guaranteed that we will do some in-place modification in the future that are 
actually on a view created by mapValues.
2. On the other hand, converting all mapValues to map whose resulted collection 
may be used outside the current block will increase many new object creations, 
especially for places where a map is called for each instance in map loop (for 
example the fetch response).

So the suggested solution is that we should keep in mind of this risk for 
in-place modifications in Scala (fortunately no-one does that often) by:

1. Adding comments on where map is already used and MUST be used since its 
resultd collection items will be modified (for now the only place I know of is 
ProducePartitionStatus, Mayuresh could you double check if there are other 
cases?).
2. Add comments on function parameters that are passed in as a view indicating 
the given map should not be modified in-place.
3. Once we are making in-place modification into a Map in Scala, check if this 
collection can be created as a view.

Mayuresh, could you submit another patch following this suggestion and also 
incorporating Neha's comments?

- Guozhang Wang


On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25136/
 ---
 
 (Updated Sept. 3, 2014, 6:27 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1610
 https://issues.apache.org/jira/browse/KAFKA-1610
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Added comments explaining the changes and reverted back some changes as per 
 comments on the reviewboard
 
 
 Removed the unnecessary import
 
 
 Made changes to comments as per the suggestions on the reviewboard
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
 691d69a49a240f38883d2025afaec26fd61281b5 
   core/src/main/scala/kafka/controller/KafkaController.scala 
 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c584b559416b3ee4bcbec5966be4891e0a03eefb 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 28711182aaa70eaa623de858bc063cb2613b2a4d 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e 
 
 Diff: https://reviews.apache.org/r/25136/diff/
 
 
 Testing
 ---
 
 Ran the unit tests and everything passed and the build succeeeded
 
 
 Thanks,
 
 Mayuresh Gharat
 




Re: (info)kafka truncate data of specific topic

2014-09-16 Thread Guozhang Wang
Hi Jacky,

Could you elaborate a bit on your use cases, like why you want to manually
truncate logs? Kafka provide a set of configs for data retention based on
data size and time (for example maintaining as much as 100 GB or up to 7
days of old data), would that be sufficient to you?

Guozhang



On Mon, Sep 15, 2014 at 8:00 PM, Jacky.J.Wang (mis.cnxa01.Newegg) 43048 
jacky.j.w...@newegg.com.invalid wrote:

 Hello kafka

 I truncate data of kafka as follow

 1:stop kafka service
 2:delete zookeeper /broker/topics/topic and /consumers/group
 3:delete kafka log dir
 4:restart kafka service
 5:recreate topic info
 but this way need to stop the service,so how truncate kafka topic data
 with no stopping kafka service?

 Eagerly awaiting your reply,thanks


 Best regards,
 Jacky.J.Wang
 Eng Software Engineer,NESC-XA
 Newegg Tech (Xian) Co., Ltd.
 15th to 16th floor, 01 Plaza, Xi’an Software Park, No. 72 Keji 2nd Road,
 Xi’an P.R.China(710075)
 Once you know, you Newegg.

 -
 CONFIDENTIALITY NOTICE: This email and any files transmitted with it may
 contain privileged or otherwise confidential information.  It is intended
 only for the person or persons to whom it is addressed. If you received
 this message in error, you are not authorized to read, print, retain, copy,
 disclose, disseminate, distribute, or use this message any part thereof or
 any information contained therein. Please notify the sender immediately and
 delete all copies of this message. Thank you in advance for your
 cooperation.

 保密注意:此邮件及其附随文件可能包含了保密信息。该邮件的目的是发送给指定收件人。如果您非指定收件人而错误地收到了本邮件,您将无权阅读、打印、保存、复制、泄露、传播、分发或使用此邮件全部或部分内容或者邮件中包含的任何信息。请立即通知发件人,并删除该邮件。感谢您的配合!




-- 
-- Guozhang


Re: How producer gets the acknowledgement back

2014-09-24 Thread Guozhang Wang
Hi,

In the new (Java) producer, you can pass in a callback function in the

FutureRecordMetadata send(ProducerRecord record, Callback callback)

call, which will be triggered when the ack is received.

Alternatively, you can also call Future.get() on the returned future
metadata, which will block until the ack is received (i.e., synced sending).

Guozhang

On Wed, Sep 24, 2014 at 6:10 AM, Sreenivasulu Nallapati 
sreenu.nallap...@gmail.com wrote:

 Hello,

 Can you please help me to get the acknowledgement  in producer?


 After setting the property  *request.required.acks to 1, *how producer gets
 the acknowledgement back? I am trying to get the acknowledgement in java
 producer.





 Thanks

 Sreeni




-- 
-- Guozhang


Re: Review Request 25944: Patch for KAFKA-1013

2014-09-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25944/#review54609
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94796

This comment line is for code line 320, better move it above it.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94797

This comment line is for code line 320, better move it above it.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94799

Is there a difference between these two:

Thread.sleep()

TimeUnit.MILLISECONDS.sleep()

since we do the former everywhere in the code base.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94800

Do we still need this variable?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94802

Can we omit collection.Seq() here can just use groupId = config.groupId?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94804

merge in a single line



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94806

revert



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94807

revert



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94812

Can we actually make the same format for ZK / offsetmanager, making two 
different formats would make it harder to be parsed since the user needs to 
know whether ZK or offsetmanager is used.



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94810

We can make a parseBrokerList in Utils and use it there, since I have seens 
this similar parsing logic at multiple places.



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94811

You can take a look at KAFKA-686's latest patch which did some cleanup on 
the util functions; these function may probably merged into Utils.



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94813

Ditto as above, can we unify the input offset format?



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94814

Ditto as above.



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94815

Same as Joel suggested: we can just use config's default values.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94816

The apache header is missing.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94819

This could be error.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94822

This can be trace, or we can just print the offset manager id in debug 
if it does not contain error code.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94820

Could be error(Error while connecting to %s:%d for fetching consumer 
metadata), since thi is not a general exception.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94821

When an exception is thrown and caught here, we should skip the rest of the 
loop.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94823

Could be error(Error while connecting to offset manager %s)



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94824

Some logging inconsistency: 

broker [%s:%d]

broker %s:%d

%s:%d



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94825

Why this API needs to return an Option while the previous one can directly 
return the reponse?



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94826

We do not need Possible cause any more, just print the exception's 
message if necessary.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94827

Are there any other exceptions that can be caught besides IOExceptions? We 
need to be careful about always catching Throwable and printing stack trace.


- Guozhang Wang


On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote

Re: Review Request 25995: Patch for KAFKA-1650

2014-09-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment94830

Do we need to do this check every time in the loop?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94831

no need empty line here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94832

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94833

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94835

Maximum bytes that can be buffered in the data channels



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94834

in terms of bytes



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94836

Inconsistency indentation.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94838

Capitalize: Offset commit interval in ms



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94841

Do you need to turn off auto commit on the consumer threads here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94840

We can add some more comment here, explaning:

1) why we add the offset commit thread for new producer, but not old 
producer;

2) what risks does the old producer have (for not having offset commit 
thread).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94842

For clean shutdown, you need to

1) halt consumer threads first.

2) wait for producer to drain all the messages in data channel.

3) manually commit offsets on consumer threads.

4) shut down consumer threads.

Otherwise we will have data duplicates as we commit offsets based on min.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94844

queueId



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94846

How about having a histogram for each queue instead of getting the sum? The 
update call would be a bit less expensive and we can monitor if some queues are 
empty while others get all the data.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94847

Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94849

Add comments explaining why we force an unclean shutdown with System.exit 
here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94857

Unfortunately this may not be the case, as we can have multiple connectors 
which are using different consumer configs with different group ids. We need to 
either 1) change the config settings to enforce this to be true, or 2) use a 
separate offset client that remembers which topics belongs to which groups.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94858

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94859

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94863

Adding comment to the logic of how this works. Also a few questions:

1) is the map() call synchronized with other threads putting new offsets 
into the map?

2) after the sorting, the logic may be clearer as

val commitableOffsetIndex = 0
while (offsets[commitableOffsetIndex] - offsets.head == 
commitableOffsetIndex) commitableOffsetIndex += 1

offsetToCommit = offsets[commitableOffsetIndex] + 1



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94855

The send().get() call is missing.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/25995/#comment94853

Apache header missing.


- Guozhang Wang


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Sept. 24, 2014, 4:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description

Re: Review Request 26306: Patch for KAFKA-1663

2014-10-03 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26306/#review55415
---



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
https://reviews.apache.org/r/26306/#comment95770

Can we change this comment accordingly, since we have other places calling 
resumeTopicDeletionThread() than these three cases.


- Guozhang Wang


On Oct. 3, 2014, 1:31 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26306/
 ---
 
 (Updated Oct. 3, 2014, 1:31 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1663
 https://issues.apache.org/jira/browse/KAFKA-1663
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1663. Controller unable to shutdown after a soft failure.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
 219c4136e905a59a745c3a596c95d59b550e7383 
 
 Diff: https://reviews.apache.org/r/26306/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Review Request 26390: Fix KAFKA-1641

2014-10-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26390/
---

Review request for kafka.


Bugs: KAFKA-1641
https://issues.apache.org/jira/browse/KAFKA-1641


Repository: kafka


Description
---

Reset cleaning start offset upon abnormal log truncation


Diffs
-

  core/src/main/scala/kafka/log/LogCleanerManager.scala 
e8ced6a5922508ea3274905be7c3d6e728f320ac 

Diff: https://reviews.apache.org/r/26390/diff/


Testing
---


Thanks,

Guozhang Wang



Review Request 26393: Follow-up KAFKA-1468

2014-10-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26393/
---

Review request for kafka.


Bugs: KAFKA-1468
https://issues.apache.org/jira/browse/KAFKA-1468


Repository: kafka


Description
---

Change array list back to linked list for watchers


Diffs
-

  core/src/main/scala/kafka/server/RequestPurgatory.scala 
cf3ed4c8f197d1197658645ccb55df0bce86bdd4 

Diff: https://reviews.apache.org/r/26393/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 26373: Patch for KAFKA-1647

2014-10-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review55615
---


Since now the first iteration of if statements is only used for logging, 
could we just merge it into the second check?

- Guozhang Wang


On Oct. 6, 2014, 5:06 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 6, 2014, 5:06 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fix for Kafka-1647.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-10-07 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review55612
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment95974

Do we need to add = here?



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/25995/#comment95975

We should keep the changes of KAFKA-1647 in its only RB and do not merge 
them here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95978

Could we add some introduction comment here on:

1. The architecture of the MM: producer / consumer thread, data channel per 
producer thread, offset commit thread, and how different modules interact with 
each other.
2. Why we need a separate offset commit thread, and how it works.
3. The startup / shutdown process, like which modules to start / shutdown 
first (this could be moved to the head of the corresponding functions also).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95979

Embedded consumer config for consuming from the source cluster.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95980

Embedded producer config for producing to the target cluster.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment95981

The offset commit thread periodically commit consumed offsets to the 
source cluster. With the new producer, the offsets are updated upon the 
returned future metadata of the send() call; with the old producer, the offsets 
are updated upon the consumer's iterator advances. By doing this, it is 
guaranteed no data loss even when mirror maker is uncleanly shutdown with the 
new producer, while with the old producer messages inside the data channel 
could be lost upon mirror maker unclean shutdown.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96019

numMessageCapacity and byteCapacity? numGetters and numPutters 
(since the producer is the consumer of this buffer and vice versa)?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96021

How about MirrorMaker-DataChannel-queue%d-NumMessages and 
MirrorMaker-DataChannel-queue%d-Bytes? and variable name 
channelNumMessageHists and channelByteHists?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96020

Can we define put(record, queueId) and put(record), and the latter includes 
the logic of determining the queueId and then call the former?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96022

comment on why we use the hashCode of source topic / partition here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96026

Instead of letting the consumer to check on the global shutdown flag, could 
we just add a shutdown function which sets it own flag like the producer thread 
and the commit thread? Then the process of the shutdown becomes

consumers.shutdown
consumers.awaitShutdown
producers.shutdown
producers.awaitShutdown
committer.shutdown
committer.awaitShutdown
connector.shutdown



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96023

Maybe just // if it exits accidentally, stop the entire mirror maker as 
we did below?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96024

// if it exits accidentally, stop the entire mirror maker



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment96025

// the committed offset will be the first offset of the un-consumed 
message, hence we need to increment by one.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/25995/#comment96027

queueNumItemCapacity and queueByteCapacity?


- Guozhang Wang


On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Oct. 6, 2014, 5:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 Talked with Joel and decided to remove multi connector support as people can 
 always creat multiple mirror maker instances if they want to consumer from 
 multiple clusters.
 
 
 Diffs
 -
 
   core/src

Re: Review Request 26373: Patch for KAFKA-1647

2014-10-08 Thread Guozhang Wang


 On Oct. 7, 2014, 12:15 a.m., Guozhang Wang wrote:
  Since now the first iteration of if statements is only used for logging, 
  could we just merge it into the second check?
 
 Jiangjie Qin wrote:
 Guozhang, thanks for the review. I actually thought about it before. I 
 agree that the code looks simpler if we just call partition.makeFollower 
 without checking whether leader is up or not. The reason I retained the first 
 if statement is that it seems more reasonable to create the remote replicas 
 only when the leader broker is online, which is done in 
 partition.makeFollower. And I'm not 100% sure whether it matters if we just 
 create the remote replicas objects without checking the liveliness of leader 
 broker.

The checking of the liveness of the leader is just for assertation (i.e. the 
leader should always be alive when the leaderAndISR request is received, 
otherwise something bad has happended and we need to log the errors, but still 
proceed by skiping this request).


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review55615
---


On Oct. 6, 2014, 5:06 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 6, 2014, 5:06 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fix for Kafka-1647.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26390: Fix KAFKA-1641

2014-10-09 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26390/
---

(Updated Oct. 9, 2014, 8:04 p.m.)


Review request for kafka.


Bugs: KAFKA-1641
https://issues.apache.org/jira/browse/KAFKA-1641


Repository: kafka


Description
---

Reset cleaning start offset upon abnormal log truncation


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleanerManager.scala 
e8ced6a5922508ea3274905be7c3d6e728f320ac 

Diff: https://reviews.apache.org/r/26390/diff/


Testing
---


Thanks,

Guozhang Wang



[DISCUSSION] Message Metadata

2014-10-10 Thread Guozhang Wang
Hello all,

I put some thoughts on enhancing our current message metadata format to
solve a bunch of existing issues:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

This wiki page is for kicking off some discussions about the feasibility of
adding more info into the message header, and if possible how we would add
them.

-- Guozhang


Re: Review Request 24676: Rebase KAFKA-1583

2014-10-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 14, 2014, 2:42 a.m.)


Review request for kafka.


Summary (updated)
-

Rebase KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor 
changes


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
a123cdc52f341a802b3e4bfeb29a6154332e5f73 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
67f2833804cb15976680e42b9dc49e275c89d266 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: [DISCUSSION] Message Metadata

2014-10-13 Thread Guozhang Wang
 and give feedback.
 
  Also, wrt discussion in the past we have used a mix of wiki comments
  and the mailing list. Personally, I think it is better to discuss on
  the mailing list (for more visibility) and just post a bold link to
  the (archived) mailing list thread on the wiki.
 
  Joel
 
  On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
   Hello all,
  
   I put some thoughts on enhancing our current message metadata format to
   solve a bunch of existing issues:
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
  
   This wiki page is for kicking off some discussions about the
 feasibility
  of
   adding more info into the message header, and if possible how we would
  add
   them.
  
   -- Guozhang
 
 




-- 
-- Guozhang


Re: Review Request 26373: Patch for KAFKA-1647

2014-10-15 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review56736
---

Ship it!


Ship It!

- Guozhang Wang


On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 13, 2014, 11:38 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSSION] Message Metadata

2014-10-15 Thread Guozhang Wang
Thanks Joe,

I think we now have a few open questions to discuss around this topic:

1. Shall we make core Kafka properties as first class fields in message
header or put them as tags?

The pros of the first approach is more compacted format and hence less
message header overhead; the cons are that any future message header change
needs protocol bump and possible multi-versioned handling on the server
side.

Vice versa for the second approach.

2. Shall we leave app properties still in message content and enforce
schema based topics or make them as extensible tags?

The pros of the first approach is again saving message header overhead for
apps properties; and the cons are that it enforce schema usage for message
content to be partially de-serialized only for app header. At LinkedIn we
enforce Avro schemas for auditing purposes, and as a result the Kafka team
has to manage the schema registration process / schema repository as well.

3. Which properties should be core KAFKA and which should be app
properties? For example, shall we make properties that only MM cares about
as app properties or Kafka properties?

Guozhang

On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein joe.st...@stealth.ly wrote:

 I think we could add schemaId(binary) to the MessageAndMetaData

 With the schemaId you can implement different downstream software pattern
 on the messages reliably. I wrote up more thoughts on this use
 https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
 should strive to encompass all implementation needs for producer, broker,
 consumer hooks.

 So if the application and tagged fields are important you can package that
 into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
 server should be able to validate your expected formats (like
 encoders/decoders but in broker by topic regardless of producer) to the
 topics that have it enabled. We should have these maintained in the project
 under contrib.

 =- Joestein

 On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hi Jay,
 
  Thanks for the comments. Replied inline.
 
  Guozhang
 
  On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
   I need to take more time to think about this. Here are a few
 off-the-cuff
   remarks:
  
   - To date we have tried really, really hard to keep the data model for
   message simple since after all you can always add whatever you like
  inside
   the message body.
  
   - For system tags, why not just make these fields first class fields in
   message? The purpose of a system tag is presumably that Why have a
 bunch
  of
   key-value pairs versus first-class fields?
  
 
  Yes, we can alternatively make system tags as first class fields in the
  message header to make the format / processing logic simpler.
 
  The main reasons I put them as systems tags are 1) when I think about
 these
  possible system tags, some of them are for all types of messages (e.g.
  timestamps), but some of them may be for a specific type of message
  (compressed, control message) and for those not all of them are
 necessarily
  required all the time, hence making them as compact tags may save us some
  space when not all of them are available; 2) with tags we do not need to
  bump up the protocol version every time we make a change to it, which
  includes keeping the logic to handle all versions on the broker until the
  old ones are officially discarded; instead, the broker can just ignore a
  tag if its id is not recognizable since the client is on a newer version,
  or use some default value / throw exception if a required tag is missing
  since the client is on an older version.
 
 
  
   - You don't necessarily need application-level tags explicitly
  represented
   in the message format for efficiency. The application can define their
  own
   header (e.g. their message could be a size delimited header followed
 by a
   size delimited body). But actually if you use Avro you don't even need
  this
   I don't think. Avro has the ability to just deserialize the header
  fields
   in your message. Avro has a notion of reader and writer schemas. The
  writer
   schema is whatever the message was written with. If the reader schema
 is
   just the header, avro will skip any fields it doesn't need and just
   deserialize the fields it does need. This is actually a much more
 usable
   and flexible way to define a header since you get all the types avro
  allows
   instead of just bytes.
  
 
  I agree that we can use a reader schema to just read out the header
 without
  de-serializing the full message, and probably for compressed message we
 can
  add an Avro / etc header for the compressed wrapper message also, but
 that
  would enforce these applications (MM, auditor, clients) to be
 schema-aware,
  which would usually require the people who manage this data pipeline also
  manage the schemas, whereas ideally Kafka itself should just consider
  bytes-in and bytes-out

Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Regarding 1634, I was intended to work on that after 1583 since it will
changes the commit offset request handling logic a lot. If people think
1583 is only a few days away before check-in, we can leave in in
0.8.2-beta; otherwise we can push to 0.8.3.

Guozhang

On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:

 +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.

 I agree to the tickets you brought up to have in 0.8.2-beta and also
 https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.

 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

  Another JIRA that will be nice to include as part of 0.8.2-beta is
  https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
  naming. Looking for people's thoughts on 2 things here -
 
  1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
  final 4-5 weeks later?
  2. Do people want to include any JIRAs (other than the ones mentioned
  above) in 0.8.2-beta? If so, it will be great to know now so it will
 allow
  us to move forward with the beta release quickly.
 
  Thanks,
  Neha
 
  On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Hi,
  
   We have accumulated an impressive list of pretty major features in
 0.8.2
  -
   Delete topic
   Automated leader rebalancing
   Controlled shutdown
   Offset management
   Parallel recovery
   min.isr and
   clean leader election
  
   In the past, what has worked for major feature releases is a beta
 release
   prior to a final release. I'm proposing we do the same for 0.8.2. The
  only
   blockers for 0.8.2-beta, that I know of are -
  
   https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change
 and
   requires some thinking about the new dependency. Since it is not fully
   ready and there are things to think about, I suggest we take it out,
  think
   it end to end and then include it in 0.8.3.)
   https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
   Guozhang Wang)
   https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
   waiting on a review by Joe Stein)
  
   It seems that 1634 and 1671 can get wrapped up in a week. Do people
 think
   we should cut 0.8.2-beta by next week?
  
   Thanks,
   Neha
  
 




-- 
-- Guozhang


Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Guozhang Wang
Agree.
On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote:

 +1 on doing an 0.8.2 beta.

 Guozhang,

 kafka-1583 is relatively large. Given that we are getting close to
 releasing 0.8.2 beta, my feeling is that we probably shouldn't include it
 in 0.8.2 beta even if we can commit it in a few days.

 Thanks,

 Jun

 On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote:

  Regarding 1634, I was intended to work on that after 1583 since it will
  changes the commit offset request handling logic a lot. If people think
  1583 is only a few days away before check-in, we can leave in in
  0.8.2-beta; otherwise we can push to 0.8.3.
 
  Guozhang
 
  On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later.
  
   I agree to the tickets you brought up to have in 0.8.2-beta and also
   https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
  
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the
 mbean
naming. Looking for people's thoughts on 2 things here -
   
1. How do folks feel about doing a 0.8.2-beta release right now and
  0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will
   allow
us to move forward with the beta release quickly.
   
Thanks,
Neha
   
On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede 
  neha.narkh...@gmail.com
wrote:
   
 Hi,

 We have accumulated an impressive list of pretty major features in
   0.8.2
-
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is a beta
   release
 prior to a final release. I'm proposing we do the same for 0.8.2.
 The
only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major
 change
   and
 requires some thinking about the new dependency. Since it is not
  fully
 ready and there are things to think about, I suggest we take it
 out,
think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This has an
 owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and
 is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week. Do people
   think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha

   
  
 
 
 
  --
  -- Guozhang
 



Re: Review Request 24676: Rebase KAFKA-1583

2014-10-16 Thread Guozhang Wang


 On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 167
  https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167
 
  Should replica manager be offset manager?

This is replica manager actually, when it tries to write the commit message 
to the local log. I have changed the comment a bit to make it more clear.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review56843
---


On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 14, 2014, 2:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + 
 minor changes
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 a123cdc52f341a802b3e4bfeb29a6154332e5f73 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 67f2833804cb15976680e42b9dc49e275c89d266 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-10-16 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 17, 2014, 4:15 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporate Jun's comments after rebase


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: [DISCUSSION] Message Metadata

2014-10-20 Thread Guozhang Wang
.

 What I was proposing is an alternative solution given that we have this
message header enhancement; with this we do not need to add another logic
for leadership map and checkpoint file, but just the logic on
replica-manager to handle this extra controlled message and remembering the
current leader epoch instead of a map.


 Thanks,

 Jun

 On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hello all,
 
  I put some thoughts on enhancing our current message metadata format to
  solve a bunch of existing issues:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
 
  This wiki page is for kicking off some discussions about the feasibility
 of
  adding more info into the message header, and if possible how we would
 add
  them.
 
  -- Guozhang
 




-- 
-- Guozhang


Re: [DISCUSSION] Message Metadata

2014-10-20 Thread Guozhang Wang
I have updated the wiki page incorporating received comments. We can
discuss some more details on:

1. How we want to do audit? Whether we want to have in-built auditing on
brokers or even MMs or use  an audit consumer to fetch all messages from
just brokers.

2. How we can avoid de-/re-compression on brokers and MMs with log
compaction turned on.

3. How we can resolve unclean leader election resulted data inconsistency
with control messages.

Guozhang

On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks for the detailed comments Jun! Some replies inlined.

 On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for the writeup.

 A few high level comments.

 1. Associating (versioned) schemas to a topic can be a good thing overall.
 Yes, this could add a bit more management overhead in Kafka. However, it
 makes sure that the data format contract between a producer and a consumer
 is kept and managed in a central place, instead of in the application. The
 latter is probably easier to start with, but is likely to be brittle in
 the
 long run.


 I am actually not proposing to not support associated versioned schemas
 for topics, but to not let some core Kafka functionalities like auditing
 being depend on schemas. I think this alone can separate the schema
 management from Kafka piping management (i.e. making sure every single
 message is delivered, and within some latency, etc). Adding additional
 auditing info into an existing schema will force Kafka to be aware of the
 schema systems (Avro, JSON, etc).



 2. Auditing can be a general feature that's useful for many applications.
 Such a feature can be implemented by extending the low level message
 format
 with a header. However, it can also be added as part of the schema
 management. For example, you can imagine a type of audited schema that
 adds
 additional auditing info to an existing schema automatically. Performance
 wise, it probably doesn't make a big difference whether the auditing info
 is added in the message header or the schema header.


 See replies above.


 3. We talked about avoiding the overhead of decompressing in both the
 broker and the mirror maker. We probably need to think through how this
 works with auditing. In the more general case where you want to audit
 every
 message, one has to do the decompression to get the individual message,
 independent of how the auditing info is stored. This means that if we want
 to audit the broker directly or the consumer in mirror maker, we have to
 pay the decompression cost anyway. Similarly, if we want to extend mirror
 maker to support some customized filtering/transformation logic, we also
 have to pay the decompression cost.


 I see your point. For that I would prefer to have a MM implementation that
 is able to do de-compress / re-compress ONLY if required, for example by
 auditing, etc. I agree that we have not thought through whether we should
 enable auditing on MM, and if yes how to do that, and we could discuss
 about that in a different thread. Overall, this proposal is not just for
 tackling de-compression on MM but about the feasibility of extending Kafka
 message header for system properties / app properties.


 Some low level comments.

 4. Broker offset reassignment (kafka-527):  This probably can be done with
 just a format change on the compressed message set.

 That is true. As I mentioned in the wiki each of the problems may be
 resolvable separately but I am thinking about a general way to get all of
 them.


 5. MirrorMaker refactoring: We probably can think through how general we
 want mirror maker to be. If we want to it to be more general, we likely
 need to decompress every message just like in a normal consumer. There
 will
 definitely be overhead. However, as long as mirror maker is made scalable,
 we can overcome the overhead by just running more instances on more
 hardware resources. As for the proposed message format change, we probably
 need to think through it a bit more. The honor-ship flag seems a bit hacky
 to me.


 Replied as part of 3). Sure we can discuss more about that, will update
 the wiki for collected comments.


 6. Adding a timestamp in each message can be a useful thing. It (1) allows
 log segments to be rolled more accurately; (2) allows finding an offset
 for
 a particular timestamp more accurately. I am thinking that the timestamp
 in
 the message should probably be the time when the leader receives the
 message. Followers preserve the timestamp set by leader. To avoid time
 going back during leader change, the leader can probably set the timestamp
 to be the  max of current time and the timestamp of the last message, if
 present. That timestamp can potentially be added to the index file to
 answer offsetBeforeTimestamp queries more efficiently.


 Agreed.


 7. Log compaction: It seems that you are suggesting an improvement to
 compact the active segment as well. This can

Re: Review Request 26994: Patch for KAFKA-1719

2014-10-21 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57680
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98497

Just a stylish comment: could you group java imports with scala / other lib 
imports, and leave kafka imports at the top?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98503

Can we add a FATAL log entry here: Consumer thread existed abnormally, 
stopping the whole mirror maker?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98501

Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment98499

Is this change intended?


- Guozhang Wang


On Oct. 21, 2014, 8:37 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 21, 2014, 8:37 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 make mirror maker exit when one consumer/producer thread exits.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSSION] Message Metadata

2014-10-21 Thread Guozhang Wang
Hi Jun,

Regarding 4) in your comment, after thinking it for a while I cannot come
up a way to it along with log compaction without adding new fields into the
current format on message set. Do you have a better way that do not require
protocol changes?

Guozhang

On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com wrote:

 I have updated the wiki page incorporating received comments. We can
 discuss some more details on:

 1. How we want to do audit? Whether we want to have in-built auditing on
 brokers or even MMs or use  an audit consumer to fetch all messages from
 just brokers.

 2. How we can avoid de-/re-compression on brokers and MMs with log
 compaction turned on.

 3. How we can resolve unclean leader election resulted data inconsistency
 with control messages.

 Guozhang

 On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com
 wrote:

 Thanks for the detailed comments Jun! Some replies inlined.

 On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:

 Hi, Guozhang,

 Thanks for the writeup.

 A few high level comments.

 1. Associating (versioned) schemas to a topic can be a good thing
 overall.
 Yes, this could add a bit more management overhead in Kafka. However, it
 makes sure that the data format contract between a producer and a
 consumer
 is kept and managed in a central place, instead of in the application.
 The
 latter is probably easier to start with, but is likely to be brittle in
 the
 long run.


 I am actually not proposing to not support associated versioned schemas
 for topics, but to not let some core Kafka functionalities like auditing
 being depend on schemas. I think this alone can separate the schema
 management from Kafka piping management (i.e. making sure every single
 message is delivered, and within some latency, etc). Adding additional
 auditing info into an existing schema will force Kafka to be aware of the
 schema systems (Avro, JSON, etc).



 2. Auditing can be a general feature that's useful for many applications.
 Such a feature can be implemented by extending the low level message
 format
 with a header. However, it can also be added as part of the schema
 management. For example, you can imagine a type of audited schema that
 adds
 additional auditing info to an existing schema automatically. Performance
 wise, it probably doesn't make a big difference whether the auditing info
 is added in the message header or the schema header.


 See replies above.


 3. We talked about avoiding the overhead of decompressing in both the
 broker and the mirror maker. We probably need to think through how this
 works with auditing. In the more general case where you want to audit
 every
 message, one has to do the decompression to get the individual message,
 independent of how the auditing info is stored. This means that if we
 want
 to audit the broker directly or the consumer in mirror maker, we have to
 pay the decompression cost anyway. Similarly, if we want to extend mirror
 maker to support some customized filtering/transformation logic, we also
 have to pay the decompression cost.


 I see your point. For that I would prefer to have a MM implementation
 that is able to do de-compress / re-compress ONLY if required, for example
 by auditing, etc. I agree that we have not thought through whether we
 should enable auditing on MM, and if yes how to do that, and we could
 discuss about that in a different thread. Overall, this proposal is not
 just for tackling de-compression on MM but about the feasibility of
 extending Kafka message header for system properties / app properties.


 Some low level comments.

 4. Broker offset reassignment (kafka-527):  This probably can be done
 with
 just a format change on the compressed message set.

 That is true. As I mentioned in the wiki each of the problems may be
 resolvable separately but I am thinking about a general way to get all of
 them.


 5. MirrorMaker refactoring: We probably can think through how general we
 want mirror maker to be. If we want to it to be more general, we likely
 need to decompress every message just like in a normal consumer. There
 will
 definitely be overhead. However, as long as mirror maker is made
 scalable,
 we can overcome the overhead by just running more instances on more
 hardware resources. As for the proposed message format change, we
 probably
 need to think through it a bit more. The honor-ship flag seems a bit
 hacky
 to me.


 Replied as part of 3). Sure we can discuss more about that, will update
 the wiki for collected comments.


 6. Adding a timestamp in each message can be a useful thing. It (1)
 allows
 log segments to be rolled more accurately; (2) allows finding an offset
 for
 a particular timestamp more accurately. I am thinking that the timestamp
 in
 the message should probably be the time when the leader receives the
 message. Followers preserve the timestamp set by leader. To avoid time
 going back during leader change, the leader can

Re: Review Request 24676: Fix KAFKA-1583

2014-10-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 23, 2014, 1:53 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1583


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporate Joel's comments after rebase


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-10-22 Thread Guozhang Wang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
---


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 23, 2014, 1:53 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incoporate Joel's comments after rebase
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 157d67369baabd2206a2356b2aa421e848adab17 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 26994: Patch for KAFKA-1719

2014-10-22 Thread Guozhang Wang


 On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271
 
  Is there any value in setting this to true? It seems that just checking 
  if it is false and exiting the process suffices. Setting to true something 
  that is called cleanShutdown, when in fact, it isn't a clean shutdown is 
  confusing to read.
  
  Also good to add a FATAL log entry as suggested by Guozhang as well.

The boolean is used when the internal threads tries to exist, when it is not 
set, the threads knows it is closing abnormally and hence goes on to handle it. 
I agree its name is a bit misleading, and probably we can just name it as 
isShuttingDown.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57907
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Review Request 27101: Fix KAFKA-1501 by enabling reuse address

2014-10-23 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27101/
---

Review request for kafka.


Bugs: KAFKA-1501
https://issues.apache.org/jira/browse/KAFKA-1501


Repository: kafka


Description
---

needs repeated unit tests to verify


Diffs
-

  core/src/main/scala/kafka/network/SocketServer.scala 
cee76b323e5f3e4c783749ac9e78e1ef02897e3b 

Diff: https://reviews.apache.org/r/27101/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Guozhang Wang


 On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271
  https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271
 
  Is there any value in setting this to true? It seems that just checking 
  if it is false and exiting the process suffices. Setting to true something 
  that is called cleanShutdown, when in fact, it isn't a clean shutdown is 
  confusing to read.
  
  Also good to add a FATAL log entry as suggested by Guozhang as well.
 
 Guozhang Wang wrote:
 The boolean is used when the internal threads tries to exist, when it is 
 not set, the threads knows it is closing abnormally and hence goes on to 
 handle it. I agree its name is a bit misleading, and probably we can just 
 name it as isShuttingDown.
 
 Jiangjie Qin wrote:
 I'm thinking maybe we can use a separate flag in each thread to indicate 
 whether it exits normally or not. So in the catch clause we set that flag to 
 indicate the thread is exiting abnormally. That might be clearer.

I personally think the  flag-per-thread is an overkill here: when each thread 
is about to exist (i.e. in the finally block), all it needs to check is if the 
whole MM is currently shutdown or not, if it is, then the thread itself knows 
it exist normally, otherwise log the FATAL and force killing the MM.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review57907
---


On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 22, 2014, 10:04 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26994: Patch for KAFKA-1719

2014-10-23 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26994/#review58168
---

Ship it!


LGTM, one minor thing upon check in.


core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/26994/#comment99094

fatal(Consumer thread failure due to , t)


- Guozhang Wang


On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26994/
 ---
 
 (Updated Oct. 23, 2014, 11:20 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1719
 https://issues.apache.org/jira/browse/KAFKA-1719
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Neha and Guzhang's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
 
 Diff: https://reviews.apache.org/r/26994/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: ConsumerFetcherThread deadlock?

2014-10-23 Thread Guozhang Wang
Jack,

The fetchers are blocked on the queue since it is full, is your consumer
iterator stopped and hence not getting more data from it?

Guozhang

On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote:

 Hi all,

 We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups
 stop pulling from their respective partitions a few minutes or hours into
 execution. It looks like all ConsumerFetcherThreads associated with that
 consumer are blocking while waiting to write data to a LinkedBlockingQueue.
 They are waiting on ConditionObjects with different object IDs, and those
 object IDs do not occur elsewhere within our snapshot of thread data. It
 appears that those threads never make progress once they enter this waiting
 state.

 KAFKA-937 looks like a very similar symptom:
 https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s
 comments on that issue, a ConsumerFetcherThread should never be blocked. Is
 that still the case?

 Here’s the thread dump for the relevant threads. I can provide more
 information if needed.

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0
 prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition
 [0x7f19339f8000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x8ac24dd8 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1
 prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition
 [0x7f19321e]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x8ad280e8 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka.utils.Utils$.inLock(Utils.scala:538)
 at
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

 ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2
 prio=10 tid=0x7f1954001000 nid=0xbf1 waiting on condition
 [0x7f19326e5000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)

Review Request 27214: Fix KAFKA-1501 by enabling reuse address

2014-10-26 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27214/
---

Review request for kafka.


Bugs: KAFKA-1501
https://issues.apache.org/jira/browse/KAFKA-1501


Repository: kafka


Description
---

unit tests 10/10


Diffs
-

  core/src/main/scala/kafka/network/SocketServer.scala 
cee76b323e5f3e4c783749ac9e78e1ef02897e3b 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
dd3640f47b26a4ff1515c2dc7e964550ac35b7b2 

Diff: https://reviews.apache.org/r/27214/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 26885: Patch for KAFKA-1642

2014-10-26 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26885/#review58575
---

Ship it!


LGTM, with one minor comment below.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/26885/#comment99633

The comments When connecting or connected, this handles slow/stalled 
connections here are a bit misleading: after checking the code I realize 
connectionDelay is only triggered to detemine the delay in milis that we can 
re-check connectivity for node that is not connected, and hence if the node is 
connected again while we are determining its delay, we just set it to MAX.

Instead of making it general to the KafkaClient interface, shall we just 
add this to the code block of line 155?


- Guozhang Wang


On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26885/
 ---
 
 (Updated Oct. 23, 2014, 11:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1642
 https://issues.apache.org/jira/browse/KAFKA-1642
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Fixes two issues with the computation of ready nodes and poll timeouts in
 Sender/RecordAccumulator:
 
 1. The timeout was computed incorrectly because it took into account all 
 nodes,
 even if they had data to send such that their timeout would be 0. However, 
 nodes
 were then filtered based on whether it was possible to send (i.e. their
 connection was still good) which could result in nothing to send and a 0
 timeout, resulting in busy looping. Instead, the timeout needs to be computed
 only using data that cannot be immediately sent, i.e. where the timeout will 
 be
 greater than 0. This timeout is only used if, after filtering by whether
 connections are ready for sending, there is no data to be sent. Other events 
 can
 wake the thread up earlier, e.g. a client reconnects and becomes ready again.
 
 2. One of the conditions indicating whether data is sendable is whether a
 timeout has expired -- either the linger time or the retry backoff. This
 condition wasn't accounting for both cases properly, always using the linger
 time. This means the retry backoff was probably not being respected.
 
 KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but 
 none can send data because they are in a connection backoff period.
 
 
 Addressing Jun's comments.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 d304660f29246e9600efe3ddb28cfcc2b074bed3 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 29658d4a15f112dc0af5ce517eaab93e6f00134b 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 eea270abb16f40c9f3b47c4ea96be412fb4fdc8b 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  c5d470011d334318d5ee801021aadd0c000974a6 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 aae8d4a1e98279470587d397cc779a9baf6fee6c 
   
 clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
  0762b35abba0551f23047348c5893bb8c9acff14 
 
 Diff: https://reviews.apache.org/r/26885/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




[DISCUSSION] Nested compression in Kafka?

2014-10-27 Thread Guozhang Wang
Hello folks,

I came across this testComplexCompressDecompress in
kafka.message.MessageCompressionTest while I'm working some consumer
decompression optimization. This test checks if nested compression is
supported.

I remember vaguely that some time ago we decide not to support nested
compression at Kafka, and in the new producer's MemoryRecords I also make
this assumption in this iterator implementation. Is that still the case? If
yes shall we remove this test case?

-- Guozhang


Review Request 27256: Fix KAFKA-1735

2014-10-27 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/
---

Review request for kafka.


Bugs: KAFKA-1735
https://issues.apache.org/jira/browse/KAFKA-1735


Repository: kafka


Description
---

Handle partial reads from compressed stream


Diffs
-

  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 

Diff: https://reviews.apache.org/r/27256/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27256: Fix KAFKA-1735

2014-10-28 Thread Guozhang Wang


 On Oct. 28, 2014, 1:17 a.m., Neha Narkhede wrote:
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
  line 207
  https://reviews.apache.org/r/27256/diff/1/?file=734746#file734746line207
 
  Would it be possible to add a unit test for this?

Sure. This scenario is covered for the old consumer in 
ZookeeperConsumerCompressionTest, and I originally plan to migrate to the new 
consumer unit test in the future; thinking it twice I now feel better to add it 
to the MemoryRecordsTest.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/#review58744
---


On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27256/
 ---
 
 (Updated Oct. 27, 2014, 11:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1735
 https://issues.apache.org/jira/browse/KAFKA-1735
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Handle partial reads from compressed stream
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
 
 Diff: https://reviews.apache.org/r/27256/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-10-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 28, 2014, 10:09 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporated Joel's comments round two


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-10-30 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: [DISCUSSION] Nested compression in Kafka?

2014-10-30 Thread Guozhang Wang
Thanks Jun and Neha. I will go ahead and file the JIRA to remove this test
case then.


Review Request 27430: Fix KAFKA-1720

2014-10-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/
---

Review request for kafka.


Bugs: KAFKA-1720
https://issues.apache.org/jira/browse/KAFKA-1720


Repository: kafka


Description
---

Rename delayed requests to delayed operations, change some class names


Diffs
-

  core/src/main/scala/kafka/cluster/Partition.scala 
1be57008e983fc3a831626ecf9a861f164fcca92 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 
  core/src/main/scala/kafka/server/DelayedRequestKey.scala 
628ef59564b9b9238d7b05d26aef79d3cfec174d 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
02fa3821271e97b24fd2ae25163933222797585d 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
323b12e765f981e9bba736a204e4a8159e5c5ada 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a7720d579ea15b71511c9da0e241bd087de3674e 
  system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 

Diff: https://reviews.apache.org/r/27430/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 24676: Fix KAFKA-1583

2014-10-31 Thread Guozhang Wang


 On Oct. 31, 2014, 6:08 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, line 236
  https://reviews.apache.org/r/24676/diff/13/?file=735940#file735940line236
 
  Could we get rid of = since this method is supposed to not return any 
  value?

Thanks Jun. I will address this comment in KAFKA-1720


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review59370
---


On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 28, 2014, 10:09 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incoporated Joel's comments round two
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 157d67369baabd2206a2356b2aa421e848adab17 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27430: Fix KAFKA-1720

2014-10-31 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/
---

(Updated Nov. 1, 2014, 12:21 a.m.)


Review request for kafka.


Bugs: KAFKA-1720
https://issues.apache.org/jira/browse/KAFKA-1720


Repository: kafka


Description
---

Rename delayed requests to delayed operations, change some class names


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
1be57008e983fc3a831626ecf9a861f164fcca92 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 
  core/src/main/scala/kafka/server/DelayedRequestKey.scala 
628ef59564b9b9238d7b05d26aef79d3cfec174d 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
3007a6d89b637b93f71fdb7adab561a93d9c4c62 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
323b12e765f981e9bba736a204e4a8159e5c5ada 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a7720d579ea15b71511c9da0e241bd087de3674e 
  system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 

Diff: https://reviews.apache.org/r/27430/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27534: Patch for KAFKA-1746

2014-11-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27534/#review59898
---


Where will testcaseEnv.validationStatusDict[Test completed] be used?

- Guozhang Wang


On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27534/
 ---
 
 (Updated Nov. 3, 2014, 7:46 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1746
 https://issues.apache.org/jira/browse/KAFKA-1746
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1746 Make system tests return a useful exit code.
 
 
 KAFKA-1746 Check the exit code when running DumpLogSegments to verify data.
 
 
 Diffs
 -
 
   system_test/mirror_maker_testsuite/mirror_maker_test.py 
 c0117c64cbb7687ca8fbcec6b5c188eb880300ef 
   system_test/offset_management_testsuite/offset_management_test.py 
 12b5cd25140e1eb407dd57eef63d9783257688b2 
   system_test/replication_testsuite/replica_basic_test.py 
 660006cc253bbae3e7cd9f02601f1c1937dd1714 
   system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 
   system_test/utils/kafka_system_test_utils.py 
 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a 
 
 Diff: https://reviews.apache.org/r/27534/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 27535: Patch for KAFKA-1747

2014-11-04 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27535/#review59899
---

Ship it!


Thanks for the patch. LGTM.

- Guozhang Wang


On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27535/
 ---
 
 (Updated Nov. 3, 2014, 7:46 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1747
 https://issues.apache.org/jira/browse/KAFKA-1747
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAKFA-1747 Fix TestcaseEnv so state isn't shared between instances.
 
 
 Diffs
 -
 
   system_test/utils/testcase_env.py b3c29105c04348f036efbbdc430e14e099ca8c70 
 
 Diff: https://reviews.apache.org/r/27535/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 27534: Patch for KAFKA-1746

2014-11-05 Thread Guozhang Wang


 On Nov. 5, 2014, 1 a.m., Guozhang Wang wrote:
  Where will testcaseEnv.validationStatusDict[Test completed] be used?
 
 Ewen Cheslack-Postava wrote:
 That's where all the validation results (the test's assertions) are 
 stored. It gets scanned through at the end of the test run in 
 system_test_runner.py to generate the report and count the number of 
 failures. The setup is a bit confusing because it's named by 
 testcaseEnv.validationStatusDict and 
 testcaseEnv.testcaseResultsDict[validation_status] (see TestcaseEnv's 
 constructor).

I see. Thanks.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27534/#review59898
---


On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27534/
 ---
 
 (Updated Nov. 3, 2014, 7:46 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1746
 https://issues.apache.org/jira/browse/KAFKA-1746
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1746 Make system tests return a useful exit code.
 
 
 KAFKA-1746 Check the exit code when running DumpLogSegments to verify data.
 
 
 Diffs
 -
 
   system_test/mirror_maker_testsuite/mirror_maker_test.py 
 c0117c64cbb7687ca8fbcec6b5c188eb880300ef 
   system_test/offset_management_testsuite/offset_management_test.py 
 12b5cd25140e1eb407dd57eef63d9783257688b2 
   system_test/replication_testsuite/replica_basic_test.py 
 660006cc253bbae3e7cd9f02601f1c1937dd1714 
   system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 
   system_test/utils/kafka_system_test_utils.py 
 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a 
 
 Diff: https://reviews.apache.org/r/27534/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 6, 2014, 11:35 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27723: Patch for KAFKA-1739

2014-11-07 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27723/#review60359
---

Ship it!


- Guozhang Wang


On Nov. 7, 2014, 12:26 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27723/
 ---
 
 (Updated Nov. 7, 2014, 12:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1739
 https://issues.apache.org/jira/browse/KAFKA-1739
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Removed testComplexCompressDecompress from MessageCompressionTest class 
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 
 0bb275d0dc840e40289e488b1a00aca2e26fe6f9 
 
 Diff: https://reviews.apache.org/r/27723/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   line 152
  https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152
 
  This confused me a bit, and I think it is because initCommonFields is 
  intended to initialize fields common to all versions of the request. It is 
  a useful helper method but it becomes somewhat clunky when removing fields. 
  The partition-level timestamp is no longer a common field.
  
  If this is v2 then we should _never_ set anything in the timestamp 
  field of the struct; and if it is  v2 then we should _always_ set the 
  timestamp field (even if it is the default). However, since the timestamp 
  field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does 
  not have a default explicitly specified, I think this will break with a 
  SchemaException(missing value...) for offset commit request v0, v1 if we 
  choose to write to a bytebuffer under those versions with this code.
  
  One option is to explicitly pass in the constructor version (0, 1, 2) 
  to initCommonFields and use that to decide whether to include/exclude this 
  field, but that is weird. Another alternative is a separate helper method 
  for v0v1. That is even weirder.

Actually, the partition-level timestamp is still a commen field (we are just 
deprecating it, and chose to not serialize / de-ser in v2). I agree this is a 
bit wired as it is written in this way, I thought about this when I started the 
first version but did not come up with a better approach.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22
  https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22
 
  Can we mark this @Deprecated as well?
  
  We should probably make the primary constructor without timestamp and 
  add a secondary constructor with timestamp and mark deprecated there.
  
  Also, can we use case class.copy if timestamp needs to be modified? 
  However, per comment further down I don't think it needs to be touched.

Actually we cannot make it deprecated as it will be preserved even in the new 
version, right? Note this is not used for the wire protocol but for the cache / 
disk format.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 498
  https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498
 
  Actually, since we always set the retention period (for v0, v1) in 
  KafkaApis do we need to even touch this timestamp? i.e., we should 
  basically ignore it right? So we only need to do:
  value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).

I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) 
we need to use it as the expiration timestamp, or at least that was how I 
understood the semantics.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 6, 2014, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing

Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22
  https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22
 
  Can we mark this @Deprecated as well?
  
  We should probably make the primary constructor without timestamp and 
  add a secondary constructor with timestamp and mark deprecated there.
  
  Also, can we use case class.copy if timestamp needs to be modified? 
  However, per comment further down I don't think it needs to be touched.
 
 Guozhang Wang wrote:
 Actually we cannot make it deprecated as it will be preserved even in the 
 new version, right? Note this is not used for the wire protocol but for the 
 cache / disk format.

I should say not only for the wire protocol but also for cache disk storage 
format. And thinking about this twice, I will change to two separate classes, 
one for wire protocol and one for server storage format.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 6, 2014, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 8, 2014, 12:54 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Support for topics+streams at class WildcardStreamsHandler

2014-11-10 Thread Guozhang Wang
Hi Alan,

The reason we do not have per-topic parallelism spec in wildcard is two
folds: 1) we use a per-topic hash-based partition algorithm, and hence
having each topic with the same num. of streams may give us better load
balance, 2) with the topicFilter we will not know exactly which topics to
consume at the construction time, hence no way to specify per-topic specs.

1) has been lifted since we have implemented new partitioning algorithm,
and for 2) we need to think about how to support it if we really want to,
perhaps we can also use a regex-ed topic-count map, while ensuring that
each regex in the map is precedent of the topic filter, and no overlap with
each other, etc. What is your usecase that requires per-topic numStream
spec?

Guozhang

On Sun, Nov 9, 2014 at 6:03 AM, Alan Lavintman alan.lavint...@gmail.com
wrote:

 Hi guys, i have seen that if create a message stream by using:

 createMessageStreams

 I can define a map with Topic-#Streams

 Is there a reason why createMessageStreamsByFilter us not giving the same
 support? I have only a TopicFilter and numStreams interface such as:

 public ListKafkaStreambyte[], byte[]
 createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

 But it does not allow me to specify the parallelism per topic. Am I missing
 something or my assumption is correct?

 Bests and thanks,
 Alan.




-- 
-- Guozhang


Re: New Java producer question

2014-11-10 Thread Guozhang Wang
Joe,

Are you looking for this?


/** coderetries/code */
public static final String RETRIES_CONFIG = retries;

Guozhang

On Mon, Nov 10, 2014 at 8:11 AM, Joe Stein joe.st...@stealth.ly wrote:

 In the new Java producer I see that you can set RETRY_BACKOFF_MS_CONFIG

 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L112
 which is used

 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L217-L226
 but
 I don't see (maybe just missing it??? I haven't spent a lot of quality time
 with the code yet) how to set the number of retry this happens before
 failure as we have in the existing producer

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L98
 or is this done another way or no longer required or used???

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /




-- 
-- Guozhang


Re: Question about ZookeeperConsumerConnector

2014-11-11 Thread Guozhang Wang
Thanks Jun!

Jiangjie, could you file a JIRA? Thanks.

Guozhang

On Tue, Nov 11, 2014 at 9:27 AM, Jun Rao jun...@gmail.com wrote:

 Hi, Jiangjie,

 Thanks for the investigation. Yes, this seems like a real issue.

 1. It doesn't seem that we need to put the shutdownCommand back into the
 queue. Once an iterator receives a shutdownCommand, it will be in a Done
 state and will remain in that state forever.

 2. Yes, we just need to get the unique set of queues and put in a
 shutdownCommand
 per queue.

 Jun


 On Mon, Nov 10, 2014 at 7:27 PM, Becket Qin becket@gmail.com wrote:

  Hi,
 
  We encountered a production issue recently that Mirror Maker could not
  properly shutdown because ZookeeperConsumerConnector is blocked on
  shutdown(). After looking into the code, we found 2 issues that caused
 this
  problem.
 
  1. After consumer iterator receives the shutdownCommand, It puts the
  shutdownCommand back into the data chunk queue. Is there any reason for
  doing this?
 
  2. In ZookeeperConsumerConnector shutdown(), we could potentially put
  multiple shutdownCommand into the same data chunk queue, provided the
  topics are sharing the same data chunk queue in topicThreadIdAndQueues.
  (KAFKA-1764 is opened)
 
  In our case, we only have 1 consumer stream for all the topics, the data
  chunk queue capacity is set to 1. The execution sequence causing problem
 is
  as below:
  1. ZookeeperConsumerConnector shutdown() is called, it tries to put
  shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
  have 1 queue, multiple shutdownCommand will be put into the queue.
  2. In sendShutdownToAllQueues(), between queue.clean() and
  queue.put(shutdownCommand), consumer iterator receives the
 shutdownCommand
  and put it back into the data chunk queue. After that,
  ZookeeperConsumerConnector tries to put another shutdownCommand into the
  data chunk queue but will block forever.
 
 
  The thread stack trace is as below:
 
  Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting
  on condition [0x7ff4f0124000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  0x000680b96bf0 (a
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at
 
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at
 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at
  scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
  at
 
 
 kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
  - locked 0x000680dd5848 (a java.lang.Object)
  at
 
 
 kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
  at
 
 
 kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
  at kafka.tools.MirrorMaker$$anon$1.run(MirrorMaker.scala:169)
 
 
  Thanks.
 
  Jiangjie (Becket) Qin
 




-- 
-- Guozhang


Re: Review Request 27890: Patch for KAFKA-1764

2014-11-12 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27890/#review61001
---


Could you also remove the line in consumer that sends back the shutdown command?

- Guozhang Wang


On Nov. 11, 2014, 10:59 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27890/
 ---
 
 (Updated Nov. 11, 2014, 10:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1764
 https://issues.apache.org/jira/browse/KAFKA-1764
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix for KAFKA-1764
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
 
 Diff: https://reviews.apache.org/r/27890/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Kafka Command Line Shell

2014-11-12 Thread Guozhang Wang
Thanks Joe. I will read the wiki page.

On Tue, Nov 11, 2014 at 11:47 PM, Joe Stein joe.st...@stealth.ly wrote:

 I started writing this up on the wiki

 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements

 Instead of starting a new thread I figure just continue this one I started.
 I also added another (important) component for centralized management of
 configuration as global level much like we have topic level.  These
 global configuration would be overridden (perhaps not all) from the
 server.properties on start (so like in case one broker needs a different
 port, sure).

 One concern I have is that using RQ/RP wire protocol to the
 controller instead of the current way (via ZK admin path) may expose
 concurrency on the admin requests, which may not be supported yet.

 Guozhang, take a look at the diagram how I am thinking of this it would be
 a new handle request that will execute the tools pretty much how they are
 today. My thinking is maybe to-do one at a time (so TopicCommand first I
 think) and have what the TopicCommand is doing happen on server and send
 the RQ/RP to the client but execute on the server. If there is something
 not supported we will of course have to deal with that and implement it for
 sure.  Once we get one working end to end I think adding the rest will be
 (more or less) concise iterations to get it done. I added your concern to
 the wiki under the gotchas section.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Mon, Oct 20, 2014 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote:

  One concern I have is that using RQ/RP wire protocol to the controller
  instead of the current way (via ZK admin path) may expose concurrency on
  the admin requests, which may not be supported yet.
 
  Some initial discussion about this is on KAFKA-1305.
 
  Guozhang
 
  On Sun, Oct 19, 2014 at 1:55 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   Maybe we should add some AdminMessage RQ/RP wire protocol structure(s)
  and
   let the controller handle it? We could then build the CLI and Shell in
  the
   project both as useful tools and samples for others.
  
   Making a http interface should be simple after KAFKA-1494 is done which
  all
   client libraries could offer.
  
   I will update the design tonight/tomorrow and should be able to have
   someone starting to work on it this week.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Oct 19, 2014 1:21 PM, Harsha ka...@harsha.io wrote:
  
+1 for Web Api
   
On Sat, Oct 18, 2014, at 11:48 PM, Glen Mazza wrote:
 Apache Karaf has been doing this for quite a few years, albeit in
  Java
 not Scala.  Still, their coding approach to creating a CLI probably
 captures many lessons learned over that time.

 Glen

 On 10/17/2014 08:03 PM, Joe Stein wrote:
  Hi, I have been thinking about the ease of use for operations
 with
Kafka.
  We have lots of tools doing a lot of different things and they
 are
   all
kind
  of in different places.
 
  So, what I was thinking is to have a single interface for our
  tooling
  https://issues.apache.org/jira/browse/KAFKA-1694
 
  This would manifest itself in two ways 1) a command line
 interface
  2)
a repl
 
  We would have one entry point centrally for all Kafka commands.
  kafka CMD ARGS
  kafka createTopic --brokerList etc,
  kafka reassignPartition --brokerList etc,
 
  or execute and run the shell
 
  kafka --brokerList localhost
  kafkause topicName;
  kafkaset acl='label';
 
  I was thinking that all calls would be initialized through
--brokerList and
  the broker can tell the KafkaCommandTool what server to connect
 to
   for
  MetaData.
 
  Thoughts? Tomatoes?
 
  /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop 
  http://www.twitter.com/allthingshadoop
  /
 

   
  
 
 
 
  --
  -- Guozhang
 




-- 
-- Guozhang


Re: Review Request 27890: Patch for KAFKA-1764

2014-11-12 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27890/#review61099
---

Ship it!


Ship It!

- Guozhang Wang


On Nov. 12, 2014, 10:05 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27890/
 ---
 
 (Updated Nov. 12, 2014, 10:05 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1764
 https://issues.apache.org/jira/browse/KAFKA-1764
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Changed Consumer iterator to stop putting the shutdown message back into 
 channel.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
 ac491b4da2583ef7227c67f5b8bc0fd731d705c3 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
 
 Diff: https://reviews.apache.org/r/27890/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSSION] Message Metadata

2014-11-13 Thread Guozhang Wang
Hi Jun,

Sorry for the delay on your comments in the wiki page as well as this
thread; quite swamped now. I will get back to you as soon as I find some
time.

Guozhang

On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao jun...@gmail.com wrote:

 Thinking about this a bit more. For adding the auditing support, I am not
 sure if we need to change the message format by adding the application
 tags. An alternative way to do that is to add it in the producer client.
 For example, for each message payload (doesn't matter what the
 serialization mechanism is) that a producer receives, the producer can just
 add a header before the original payload. The header will contain all
 needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This
 way, we don't need to change the message format and the auditing info can
 be added independent of the serialization mechanism of the message. The
 header can use a different serialization mechanism for better efficiency.
 For example, if we use Avro to serialize the header, the encoded bytes
 won't include the field names in the header. This is potentially more
 efficient than representing those fields as application tags in the message
 where the tags have to be explicitly store in every message.

 To make it easier for the client to add and make use of this kind of
 auditing support, I was imagining that we can add a ProducerFactory in the
 new java client. The ProducerFactory will create an instance of Producer
 based on a config property. By default, the current KafkaProducer will be
 returned. However, a user can plug in a different implementation of
 Producer that does auditing. For example, an implementation of an
 AuditProducer.send() can take the original ProducerRecord, add the header
 to the value byte array and then forward the record to an underlying
 KafkaProducer. We can add a similar ConsumerFactory to the new consumer
 client. If a user plugs in an implementation of the AuditingConsumer, the
 consumer will then be audited automatically.

 Thanks,

 Jun

 On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Jun,
 
  Regarding 4) in your comment, after thinking it for a while I cannot come
  up a way to it along with log compaction without adding new fields into
 the
  current format on message set. Do you have a better way that do not
 require
  protocol changes?
 
  Guozhang
 
  On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   I have updated the wiki page incorporating received comments. We can
   discuss some more details on:
  
   1. How we want to do audit? Whether we want to have in-built auditing
 on
   brokers or even MMs or use  an audit consumer to fetch all messages
 from
   just brokers.
  
   2. How we can avoid de-/re-compression on brokers and MMs with log
   compaction turned on.
  
   3. How we can resolve unclean leader election resulted data
 inconsistency
   with control messages.
  
   Guozhang
  
   On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com
   wrote:
  
   Thanks for the detailed comments Jun! Some replies inlined.
  
   On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote:
  
   Hi, Guozhang,
  
   Thanks for the writeup.
  
   A few high level comments.
  
   1. Associating (versioned) schemas to a topic can be a good thing
   overall.
   Yes, this could add a bit more management overhead in Kafka. However,
  it
   makes sure that the data format contract between a producer and a
   consumer
   is kept and managed in a central place, instead of in the
 application.
   The
   latter is probably easier to start with, but is likely to be brittle
 in
   the
   long run.
  
  
   I am actually not proposing to not support associated versioned
 schemas
   for topics, but to not let some core Kafka functionalities like
 auditing
   being depend on schemas. I think this alone can separate the schema
   management from Kafka piping management (i.e. making sure every single
   message is delivered, and within some latency, etc). Adding additional
   auditing info into an existing schema will force Kafka to be aware of
  the
   schema systems (Avro, JSON, etc).
  
  
  
   2. Auditing can be a general feature that's useful for many
  applications.
   Such a feature can be implemented by extending the low level message
   format
   with a header. However, it can also be added as part of the schema
   management. For example, you can imagine a type of audited schema
 that
   adds
   additional auditing info to an existing schema automatically.
  Performance
   wise, it probably doesn't make a big difference whether the auditing
  info
   is added in the message header or the schema header.
  
  
   See replies above.
  
  
   3. We talked about avoiding the overhead of decompressing in both the
   broker and the mirror maker. We probably need to think through how
 this
   works with auditing. In the more general case where you want to audit
   every

Re: Kafka Simple Consumer API for 0.9

2014-11-17 Thread Guozhang Wang
Hi Dibyendu,

Yes we are changing the consumer API in 0.9, which will be different with
the current high-level consumer API. We are also trying to figure out a way
to preserve the functionality: KAFKA-1655
https://issues.apache.org/jira/browse/KAFKA-1655

Guozhang

On Fri, Nov 14, 2014 at 5:29 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Is the Simple Consumer API will change in Kafka 0.9 ?

 I can see a Consumer Re-design approach for Kafka 0.9 , which I believe
 will not impact any client written using Simple Consumer API . Is that
 correct ?


 Regards,
 Dibyendu




-- 
-- Guozhang


Re: Support for topics+streams at class WildcardStreamsHandler

2014-11-17 Thread Guozhang Wang
Just added an entry in the FAQ page:

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tIspecifythenumberofstreamsparallelismpertopicmapusingwildcardstreamasIusestaticstreamhandler
?

On Mon, Nov 10, 2014 at 7:56 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Alan,

 The reason we do not have per-topic parallelism spec in wildcard is two
 folds: 1) we use a per-topic hash-based partition algorithm, and hence
 having each topic with the same num. of streams may give us better load
 balance, 2) with the topicFilter we will not know exactly which topics to
 consume at the construction time, hence no way to specify per-topic specs.

 1) has been lifted since we have implemented new partitioning algorithm,
 and for 2) we need to think about how to support it if we really want to,
 perhaps we can also use a regex-ed topic-count map, while ensuring that
 each regex in the map is precedent of the topic filter, and no overlap with
 each other, etc. What is your usecase that requires per-topic numStream
 spec?

 Guozhang

 On Sun, Nov 9, 2014 at 6:03 AM, Alan Lavintman alan.lavint...@gmail.com
 wrote:

 Hi guys, i have seen that if create a message stream by using:

 createMessageStreams

 I can define a map with Topic-#Streams

 Is there a reason why createMessageStreamsByFilter us not giving the same
 support? I have only a TopicFilter and numStreams interface such as:

 public ListKafkaStreambyte[], byte[]
 createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

 But it does not allow me to specify the parallelism per topic. Am I
 missing
 something or my assumption is correct?

 Bests and thanks,
 Alan.




 --
 -- Guozhang




-- 
-- Guozhang


Re: Review Request 28040: Patch for KAFKA-1770

2014-11-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28040/#review61751
---


UnknownTopicOrPartitionException can be thrown either at the producer side when 
the cached metadata does not have the info for this partitionId, or thrown at 
the server side when the specified partition does not exist on the broker. So 
how about:

Indicates one of the following situation:
1. Producer does not have the know the partition metadata for this id upon 
sending messages
2. Broker does not have the specified partition by id upon receiving messages


core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala
https://reviews.apache.org/r/28040/#comment103633

remove  (and the end  on line 23)


- Guozhang Wang


On Nov. 14, 2014, 4:57 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28040/
 ---
 
 (Updated Nov. 14, 2014, 4:57 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1770
 https://issues.apache.org/jira/browse/KAFKA-1770
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Modified doc for UnknownTopicOrPartitionException
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala 
 781e551e5b78b5f436431575c2961fe15acd1414 
 
 Diff: https://reviews.apache.org/r/28040/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25995: Patch for KAFKA-1650

2014-11-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review61528
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103712

based on the hash value of the source topic-partitonId string



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103221

Could you make this consistent with others as offset.commit.internal.ms?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103713

In this case, for MaxInFlightRequests  1 we are not only at the risk of 
message reordering but also at the risk of message loss upon failures right?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103714

Actually, can we move this comment into the top comment of the MM class as 
a NOTE under the producer paragraph?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103715

Is it possible that the current chunk has been consumed completely and the 
fetcher thread has yet put in a new chunk, and hence hasNext() will return 
false? If this case shall we stop the consumer or just let it block?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103716

Since we already has the logIdent as threadName, I think we do not need 
this.getName here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment103718

With the logIdent it will become:

FATAL [mirrormaker-offset-commit-thread] Offset commit thread exits due to 
...

which is a bit verbose?


- Guozhang Wang


On Nov. 12, 2014, 5:51 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Nov. 12, 2014, 5:51 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650 and KAKFA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 https://issues.apache.org/jira/browse/KAKFA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments
 
 
 commit before switch to trunk
 
 
 commit before rebase
 
 
 Rebased on trunk, Addressed Guozhang's comments.
 
 
 Addressed Guozhang's comments on MaxInFlightRequests
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 mirrormaker-redesign
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 f399105087588946987bbc84e3759935d9498b6a 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Guozhang Wang


 On Nov. 17, 2014, 10:31 p.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   line 151
  https://reviews.apache.org/r/27391/diff/3/?file=755182#file755182line151
 
  Made a follow-up comment in the earlier RB. But pasting here as well:
  
  Agree that it is still common in the object but it is completely 
  removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 
  which is in the V2 OffsetCommitRequest does not have a timestamp.
  
  This method should probably be read as initCommonFieldsInStruct - 
  i.e., effectively the wire protocol. That said, I'm loathe to add another 
  init method which reads initCommonFieldsInV0AndV1. So I think rather than 
  checking fetchPartitionData.timestamp it would be better to explicitly 
  check the (already set) request version in the struct. If v0 or v1 then set 
  the timestamp key name.

The version number info is not in the struct, so we cannot get its value unless 
we are going to add that from the request header into the constructor. But we 
can check if the struct has that field or not. Changed to this accordingly.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review61761
---


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 498
  https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498
 
  Actually, since we always set the retention period (for v0, v1) in 
  KafkaApis do we need to even touch this timestamp? i.e., we should 
  basically ignore it right? So we only need to do:
  value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).
 
 Guozhang Wang wrote:
 I think note. In v0/v1, if the timestamp is explicitly specified (i.e. 
 not -1) we need to use it as the expiration timestamp, or at least that was 
 how I understood the semantics.
 
 Guozhang Wang wrote:
 I think we cannot not
 
 Joel Koshy wrote:
 Right - what I meant was in KafkaApis we can just compute the 
 retentionPeriod if v0 or v1.
 
 So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days.

retention is a global parameter which we use later to set the per-message 
timestamp; but if the timestamps from the v0/v1 requests are different then we 
cannot just use a single retention right?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 18, 2014, 1:42 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1634


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
f476973eeff653473a60c3ecf36e870e386536bc 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Review Request 28240: Follow-up KAFKA-1580

2014-11-19 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28240/
---

Review request for kafka.


Bugs: KAFKA-1580
https://issues.apache.org/jira/browse/KAFKA-1580


Repository: kafka


Description
---

Add the logic for checking internal topics in the replica manager layer


Diffs
-

  core/src/main/scala/kafka/admin/AdminUtils.scala 
94c53320b768b83b0324336b73cc06537c0fe78d 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
3007a6d89b637b93f71fdb7adab561a93d9c4c62 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
8531f533f3a6431f4f9fc8cb1ad3e9f1f1b110e0 

Diff: https://reviews.apache.org/r/28240/diff/


Testing
---


Thanks,

Guozhang Wang



Review Request 28268: Bump up default scala version from 2.10.1 to 2.10.4

2014-11-19 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28268/
---

Review request for kafka.


Bugs: KAFKA-1624
https://issues.apache.org/jira/browse/KAFKA-1624


Repository: kafka


Description
---

Compiles with Java 1.6/7/8


Diffs
-

  bin/kafka-run-class.sh 36c742b67a7259fa35c3ed862f7ccc4673b69d9f 
  bin/windows/kafka-run-class.bat 8e9780e2eb74a35a726787155c09b151d0ba 
  build.gradle 11eb11355efddacf62d61690ad13b9c82a200230 
  gradle.properties 5d3155fd4461438d8b2ec4faa9534cc2383d4951 
  scala.gradle 6adf9af7dbbe71e68a07b387c3854d8c9ad339e0 

Diff: https://reviews.apache.org/r/28268/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Welcome Kafka's newest committer

2014-11-19 Thread Guozhang Wang
Thank you folks! Glad to be on board.

Guozhang

On Wed, Nov 19, 2014 at 4:34 PM, Harsha ka...@harsha.io wrote:

 Congrats Guozhang

 On Wed, Nov 19, 2014, at 04:31 PM, Joe Stein wrote:
  Congrats!!!
 
  On Wed, Nov 19, 2014 at 7:05 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Hi everyone,
  
   I'm very happy to announce that the Kafka PMC has invited Guozhang
 Wang to
   become a committer. Guozhang has made significant contributions to
 Kafka
   over the past year, along with being very active on code reviews and
 the
   mailing list.
  
   Please join me in welcoming him.
  
   Thanks,
   Neha (on behalf of the Kafka PMC)
  




-- 
-- Guozhang


Re: Review Request 28240: Follow-up KAFKA-1580

2014-11-21 Thread Guozhang Wang


 On Nov. 21, 2014, 1:48 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 193
  https://reviews.apache.org/r/28240/diff/1/?file=770058#file770058line193
 
  I think it is more suitable to have this access control implemented 
  inside ReplicaManager. One can argue both ways though...

I was actually back and forth quite some times back then. The main reason I 
choose to do this in the Kafka API layer is that otherwise the offset manager / 
replica manager has to handle operations with clientId, which defeats the 
purpose of the refactoring effort.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28240/#review62547
---


On Nov. 19, 2014, 7:01 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28240/
 ---
 
 (Updated Nov. 19, 2014, 7:01 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1580
 https://issues.apache.org/jira/browse/KAFKA-1580
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add the logic for checking internal topics in the replica manager layer
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/AdminUtils.scala 
 94c53320b768b83b0324336b73cc06537c0fe78d 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 3007a6d89b637b93f71fdb7adab561a93d9c4c62 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 8531f533f3a6431f4f9fc8cb1ad3e9f1f1b110e0 
 
 Diff: https://reviews.apache.org/r/28240/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Guozhang Wang


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 147
  https://reviews.apache.org/r/27391/diff/4/?file=766686#file766686line147
 
  I think what you meant earlier was with  v2 you could have different 
  timestamps for each partition so a global retention won't work with  v2

In the OffsetManager, the timestamp will only be overriden by the global one if 
it is set to Default which is -1, so for v0/1 although the retention is used it 
will not be overriding the timestamp.


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17
  https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17
 
  I thought we would be going with separate format for on-disk storage?
  
  E.g., one thing that is extremely useful (until we have timestamp as a 
  first-class field of messages) is to have the receive time of the 
  offsetcommit in the stored offset entries. This is very useful for 
  debugging.

Yes they are separated: for on-disk storage the timestamp will always been 
stored, and for wire protocol only v0/1 will contain that value, but for v2 
this value will be computed via retention. So the on-disk format is specified 
as OffsetAndMetadata, and when we deprecating v0/1 and adding the timestmap to 
message header we will replace this with OffsetMetadata.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 18, 2014, 1:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Guozhang Wang


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 500
  https://reviews.apache.org/r/27391/diff/4/?file=766688#file766688line500
 
  This and the above use give compilation warnings. I think it is 
  reasonable to copy those constants here to get rid of the warnings since 
  we are anyway in an undesirable state right now of maintaining a mirror 
  wire-protocol implementation in scala

Not sure what you mean by copy those contants? I did the copy inside the if 
block and it is the compilation error from the condition.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 18, 2014, 1:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




<    1   2   3   4   5   6   7   8   9   10   >