Review Request 24214: Patch for KAFKA-1374

2014-08-03 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
---

LogCleaner code decompresses the compressed messages and writes back the 
retained/compacted messages in compressed form


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2015-05-12 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review83392
---


Sorry for the delay. Overall, this looks good.

As discussed earlier, this patch needs a minor rebase.

There are a couple of points to note:
- In KAFKA-1499 you added broker-side compression. When writing out the 
compacted messages, we should compress using the configured compression codec. 
We can do this as an incremental change if you prefer. i.e., your current patch 
makes the log cleaner compression-aware. A subsequent patch can handle writing 
out to the configured codec. That part could be non-trivial as we would then 
probably want to do some batching when writing out compacted compressed 
messages.
- In KAFKA-1755 I had added some defensive code to prevent compressed messages 
and unkeyed messages from getting in. The compression-related code will need to 
be removed. Again, let me know if you need any help with this.

Let me know if you need help with any of this.


core/src/main/scala/kafka/log/LogCleaner.scala


I would suggest one of two options over this (i.e., instead of two helper 
methods)
- Inline both here and get rid of those
- Have a single private helper (e.g., collectRetainedMessages)



core/src/main/scala/kafka/log/LogCleaner.scala


We should now compress with the compression codec of the topic (KAFKA-1499)



core/src/main/scala/kafka/log/LogCleaner.scala


We should instead do a trivial refactor in ByteBufferMessageSet to compress 
messages in a preallocated buffer. It would be preferable to avoid having this 
compression logic in different places.


- Joel Koshy


On Jan. 17, 2015, 6:53 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Jan. 17, 2015, 6:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Updating the rebased code
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> af496f7c547a5ac7a4096a6af325dad0d8feec6f 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 1000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 1000 rows of data produced, 1645281 rows of data consumed (83.5% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-514345501144701.txt
> 10 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-51

Re: Review Request 24214: Patch for KAFKA-1374

2015-05-18 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated May 18, 2015, 5:29 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Addressing Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
abea8b251895a5cc0788c6e25b112a2935a3f631 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
9dfe914991aaf82162e5e300c587c794555d5fd0 
  core/src/main/scala/kafka/message/MessageSet.scala 
28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
  core/src/test/scala/kafka/tools/TestLogCleaning.scala 
844589427cb9337acd89a5239a98b811ee58118e 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
76d3bfd378f32fd2b216b3ebdec86e2070491924 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---

/*TestLogCleaning stress test output for compressed messages/

Producing 10 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-6014466306002699464.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-177538909590644701.txt
10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
De-duplicating and validating output files...
Validated 9005 values, 0 mismatches.

Producing 100 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3298578695475992991.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 1000 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3336255463347572934.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
1000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


/*TestLogCleaning stress test output for non-compressed messages*/

Producing 10 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-5174543709786189363.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-514345501144701.txt
10 rows of data produced, 22775 rows of data consumed (77.2% reduction).
De-duplicating and validating output files...
Validated 17874 values, 0 mismatches.

Producing 100 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-7814446915546169271.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
100 rows of data produced, 129230 rows of data consumed (87.1% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 1000 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-6092986571905399164.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-63626021421841220.txt
1000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2015-05-18 Thread Manikumar Reddy O


> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 409
> > 
> >
> > I would suggest one of two options over this (i.e., instead of two 
> > helper methods)
> > - Inline both here and get rid of those
> > - Have a single private helper (e.g., collectRetainedMessages)

removed the  helper methods


> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 479
> > 
> >
> > We should now compress with the compression codec of the topic 
> > (KAFKA-1499)

will do as separate JIRA


> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 498
> > 
> >
> > We should instead do a trivial refactor in ByteBufferMessageSet to 
> > compress messages in a preallocated buffer. It would be preferable to avoid 
> > having this compression logic in different places.

moved the compresssMessages() method to ByteBufferMessageSet class. Pl let me 
know your thoughts..


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review83392
---


On May 18, 2015, 5:29 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated May 18, 2015, 5:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> abea8b251895a5cc0788c6e25b112a2935a3f631 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 9dfe914991aaf82162e5e300c587c794555d5fd0 
>   core/src/main/scala/kafka/message/MessageSet.scala 
> 28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
>   core/src/test/scala/kafka/tools/TestLogCleaning.scala 
> 844589427cb9337acd89a5239a98b811ee58118e 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 76d3bfd378f32fd2b216b3ebdec86e2070491924 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 1000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 1000 rows of data produced, 1645281 rows of data consumed (83.5% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-514345501144701.txt
> 10 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 100 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-dup

Re: Review Request 24214: Patch for KAFKA-1374

2015-05-19 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review84278
---


Thanks for the updated patch. This looks good. I ended up rebasing while you 
were working on this :) I have a few additional edits which I noted below which 
I will upload shortly.


core/src/main/scala/kafka/log/LogCleaner.scala


Minor improvement: we can avoid an extra copy by filtering the iterator 
above, and then materializing once.



core/src/main/scala/kafka/log/LogCleaner.scala


I'm wondering if it would be helpful to split stats into compressed vs 
noncompressed.

E.g., x bytes read (from y compressed bytes); n messages read (from m 
compressed messages) and so on...



core/src/main/scala/kafka/log/LogCleaner.scala


The last statement can be !redundant && !obsoleteDelete



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala


I actually had a different thought - i.e., to avoid duplicating the 
compression code in BBMS. Then I ran into the issue that you probably saw - 
i.e., the BBMS create method isn't very amenable to refactor with pre-assigned 
offsets. So I think what you originally had was actually better.

Ideally we should have a compress (raw bytes) method and just use that in 
both places. In fact, we can consider using the Compressor in clients - which 
will have the added benefit of identical compression in use in both the broker 
and clients. E.g., right now it is possible to be under the message size limit 
on the client and still exceed it on the broker.



core/src/main/scala/kafka/message/MessageSet.scala


Can do without this addition.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala


Minor improvement here to avoid the extra hashmap.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala


Can use Stream.cons for convenience.



core/src/test/scala/unit/kafka/log/LogTest.scala


Few more minor edits - to test appending keyed compressed messages.


- Joel Koshy


On May 18, 2015, 5:29 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated May 18, 2015, 5:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> abea8b251895a5cc0788c6e25b112a2935a3f631 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 9dfe914991aaf82162e5e300c587c794555d5fd0 
>   core/src/main/scala/kafka/message/MessageSet.scala 
> 28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
>   core/src/test/scala/kafka/tools/TestLogCleaning.scala 
> 844589427cb9337acd89a5239a98b811ee58118e 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 
> 76d3bfd378f32fd2b216b3ebdec86e2070491924 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 1000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-91491882707

Re: Review Request 24214: Patch for KAFKA-1374

2014-09-23 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Sept. 23, 2014, 4:20 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Addresing Jun's comments


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-09-23 Thread Manikumar Reddy O


> On Aug. 18, 2014, 5:21 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 436-438
> > 
> >
> > Hmm, I think the original approach of throwing an exception is probably 
> > better. When handling the produce requests, we can reject messages w/o a 
> > key, if the topic is configured with compaction. Once we do that, there 
> > should be no messages with null key during compaction. If that happens, we 
> > should just fail the broker.

Ok.. I reverted the changes. We will revisit the solution in KAFKA-1581


> On Aug. 18, 2014, 5:21 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 479-481
> > 
> >
> > Could we use MemoryRecords.RecordsIterator to iterate compressed 
> > messages?

This chage required some complicated, So i am dropping this issue.


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50893
---


On Sept. 23, 2014, 4:20 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Sept. 23, 2014, 4:20 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addresing Jun's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-09-23 Thread Manikumar Reddy O


> On Aug. 18, 2014, 5:32 p.m., Joel Koshy wrote:
> > I should be able to review this later today. However, as Jun also mentioned 
> > can you please run the stress test? When I was working on the original 
> > (WIP) patch it worked but eventually failed (due to various reasons such as 
> > corrupt message sizes, etc) on a stress test after several segments had 
> > rolled and after several log cleaner runs - although I didn't get time to 
> > look into it your patch should have hopefully addressed these issues.

I tested the patch with my own test code and it is working fine.

I ran TestLogCleaning stress test.  Some times this test is failing. 
But i am not getting any broker-side errors/corrupt messages.  

I also ran TestLogCleaning on trunk (without my patch). This test is failing 
for multiple topics.
I am looking into TestLogCleaning code and trying fix if any issue.

I will keep you updated on the testing status.


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
---


On Sept. 23, 2014, 4:20 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Sept. 23, 2014, 4:20 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addresing Jun's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-10-03 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Oct. 3, 2014, 1:22 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

fixed couple of bugs and updating stress test details


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-10-03 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Oct. 3, 2014, 1:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
---

fixed couple of bugs and updating stress test details


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-10-03 Thread Manikumar Reddy O


> On Aug. 18, 2014, 5:32 p.m., Joel Koshy wrote:
> > I should be able to review this later today. However, as Jun also mentioned 
> > can you please run the stress test? When I was working on the original 
> > (WIP) patch it worked but eventually failed (due to various reasons such as 
> > corrupt message sizes, etc) on a stress test after several segments had 
> > rolled and after several log cleaner runs - although I didn't get time to 
> > look into it your patch should have hopefully addressed these issues.
> 
> Manikumar Reddy O wrote:
> I tested the patch with my own test code and it is working fine.
> 
> I ran TestLogCleaning stress test.  Some times this test is failing. 
> But i am not getting any broker-side errors/corrupt messages.  
> 
> I also ran TestLogCleaning on trunk (without my patch). This test is 
> failing for multiple topics.
> I am looking into TestLogCleaning code and trying fix if any issue.
> 
> I will keep you updated on the testing status.

I successfully ran the TestLogCleaning stress test. I ran the test for 1,5,10 
million messages

Jun,

I removed the usage of MemoryRecords, Compressor.putRecord classes from this 
patch. Currently Compressor.close() returns a compressed message with offset as 
number of messages in that compression. (If i compress 10,11,12,13,14,15 
message offsets, then the compressed message will have offset 5).
Because of this behavior, we can not use this for server-side compression.(For 
server side, If i compress 10,11,12,13,14,15 message offsets, then the 
compresed message shoud have offset 15)


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
---


On Oct. 3, 2014, 1:50 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Oct. 3, 2014, 1:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> fixed couple of bugs and updating stress test details
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2015-01-17 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Jan. 17, 2015, 6:51 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Updating the rebased code


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
af496f7c547a5ac7a4096a6af325dad0d8feec6f 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
07acd460b1259e0a3f4069b8b8dcd8123ef5810e 

Diff: https://reviews.apache.org/r/24214/diff/


Testing (updated)
---

/safe/KAFKA/docs/TestLogCleaning.txt


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2015-01-17 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Jan. 17, 2015, 6:53 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
---

Updating the rebased code


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 
af496f7c547a5ac7a4096a6af325dad0d8feec6f 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
07acd460b1259e0a3f4069b8b8dcd8123ef5810e 

Diff: https://reviews.apache.org/r/24214/diff/


Testing (updated)
---

/*TestLogCleaning stress test output for compressed messages/

Producing 10 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-6014466306002699464.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-177538909590644701.txt
10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
De-duplicating and validating output files...
Validated 9005 values, 0 mismatches.

Producing 100 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3298578695475992991.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 1000 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-3336255463347572934.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
1000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


/*TestLogCleaning stress test output for non-compressed messages*/

Producing 10 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-5174543709786189363.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-514345501144701.txt
10 rows of data produced, 22775 rows of data consumed (77.2% reduction).
De-duplicating and validating output files...
Validated 17874 values, 0 mismatches.

Producing 100 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-7814446915546169271.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
100 rows of data produced, 129230 rows of data consumed (87.1% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 1000 messages...
Logging produce requests to 
/tmp/kafka-log-cleaner-produced-6092986571905399164.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to 
/tmp/kafka-log-cleaner-consumed-63626021421841220.txt
1000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2015-01-18 Thread Eric Olander

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review68569
---



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala


Could be simplified to just:
for (codec <- CompressionType.values) yield Array(codec.name)


- Eric Olander


On Jan. 17, 2015, 6:53 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Jan. 17, 2015, 6:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Updating the rebased code
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> af496f7c547a5ac7a4096a6af325dad0d8feec6f 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 10 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 100 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 1000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 1000 rows of data produced, 1645281 rows of data consumed (83.5% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 10 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-514345501144701.txt
> 10 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 100 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 100 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 1000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 1000 rows of data produced, 1136608 rows of data consumed (88.6% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-05 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
---


Thanks for the patch. Some comments below.


core/src/main/scala/kafka/log/LogCleaner.scala


We probably should just record the compressed size in stats.recopyMessage() 
since that's the size actually got copied over.



core/src/main/scala/kafka/log/LogCleaner.scala


Could we use Compressor.putRecord? Then,we don't have to worry about the 
details of the message format.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala


Would it be better to make this a parameterized test so that we can test 
all compression codec?


- Jun Rao


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> LogCleaner code decompresses the compressed messages and writes back the 
> retained/compacted messages in compressed form
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-07 Thread Manikumar Reddy O


> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 500-506
> > 
> >
> > Could we use Compressor.putRecord? Then,we don't have to worry about 
> > the details of the message format.

Ok.. I will look into Compressor class usage.  But 
Compressor/MemoryRecords/Record classes are part of clients project. Can we use 
these classes in core?  I have not seen these classes usage in core project.


> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala, lines 
> > 43-51
> > 
> >
> > Would it be better to make this a parameterized test so that we can 
> > test all compression codec?

Ok.. Will add  parameterized tests


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
---


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> LogCleaner code decompresses the compressed messages and writes back the 
> retained/compacted messages in compressed form
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-07 Thread Jun Rao


> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 500-506
> > 
> >
> > Could we use Compressor.putRecord? Then,we don't have to worry about 
> > the details of the message format.
> 
> Manikumar Reddy O wrote:
> Ok.. I will look into Compressor class usage.  But 
> Compressor/MemoryRecords/Record classes are part of clients project. Can we 
> use these classes in core?  I have not seen these classes usage in core 
> project.

Yes, core already depends on clients. The idea is that overtime, the server 
side code will be sharing some of the common components in the client.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
---


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> LogCleaner code decompresses the compressed messages and writes back the 
> retained/compacted messages in compressed form
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-09 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Aug. 9, 2014, 10:30 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
messages


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-09 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Aug. 9, 2014, 10:37 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
---

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
messages


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-09 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Aug. 9, 2014, 10:51 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
---

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
messages


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-10 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
---



core/src/main/scala/kafka/log/LogCleaner.scala


Thinking about this a bit more. I am wondering if it would be better if we 
introduce a per-topic level log.compact.compress.codec property. During log 
compaction, we always write the retained data using the specified compress 
codec, independent of whether the original records are compressed or not. This 
provides the following benefits.

1. Whether the messages were compressed originally, they can be compressed 
on the broker side over time. Since compact topics preserve records much 
longer, enabling compression on the broker side will be beneficial in general.

2. As old records are removed, we still want to batch enough messages to do 
the compression.

3. The code can be a bit simpler. We can just (deep) iterate messages 
(using MemoryRecods.iterator) and append retained messages to an output 
MemoryRecords. The output MemoryRecords will be initialized with the configured 
compress codec and batch size.


- Jun Rao


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
> messages
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-10 Thread Manikumar Reddy O


> On Aug. 10, 2014, 11:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 400-420
> > 
> >
> > Thinking about this a bit more. I am wondering if it would be better if 
> > we introduce a per-topic level log.compact.compress.codec property. During 
> > log compaction, we always write the retained data using the specified 
> > compress codec, independent of whether the original records are compressed 
> > or not. This provides the following benefits.
> > 
> > 1. Whether the messages were compressed originally, they can be 
> > compressed on the broker side over time. Since compact topics preserve 
> > records much longer, enabling compression on the broker side will be 
> > beneficial in general.
> > 
> > 2. As old records are removed, we still want to batch enough messages 
> > to do the compression.
> > 
> > 3. The code can be a bit simpler. We can just (deep) iterate messages 
> > (using MemoryRecods.iterator) and append retained messages to an output 
> > MemoryRecords. The output MemoryRecords will be initialized with the 
> > configured compress codec and batch size.

What you proposed is similar to KAFKA-1499. KAFKA-1499 deals with default 
broker-side compression configuration.
I proposed new configuration properties on KAFKA-1499. The idea is to compress 
the data upon reaching the server.
This is applicable all topics (log compaction and retention).

Can you comment on KAFKA-1499?


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
---


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
> messages
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-11 Thread Manikumar Reddy O


> On Aug. 10, 2014, 11:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 400-420
> > 
> >
> > Thinking about this a bit more. I am wondering if it would be better if 
> > we introduce a per-topic level log.compact.compress.codec property. During 
> > log compaction, we always write the retained data using the specified 
> > compress codec, independent of whether the original records are compressed 
> > or not. This provides the following benefits.
> > 
> > 1. Whether the messages were compressed originally, they can be 
> > compressed on the broker side over time. Since compact topics preserve 
> > records much longer, enabling compression on the broker side will be 
> > beneficial in general.
> > 
> > 2. As old records are removed, we still want to batch enough messages 
> > to do the compression.
> > 
> > 3. The code can be a bit simpler. We can just (deep) iterate messages 
> > (using MemoryRecods.iterator) and append retained messages to an output 
> > MemoryRecords. The output MemoryRecords will be initialized with the 
> > configured compress codec and batch size.
> 
> Manikumar Reddy O wrote:
> What you proposed is similar to KAFKA-1499. KAFKA-1499 deals with default 
> broker-side compression configuration.
> I proposed new configuration properties on KAFKA-1499. The idea is to 
> compress the data upon reaching the server.
> This is applicable all topics (log compaction and retention).
> 
> Can you comment on KAFKA-1499?

Assuming we have broker-side compression (KAFKA-1499), Do we still need 
special-compression during log compaction?

1) With some broker-side compression (codec: gzip, snappy. etc..)

With KAFKA-1499 we will compress all the messages with specified compression 
codec. During log compaction, we write 
the retained data using same compression codec. 

2) Without broker-side compression (codec: none)

If some user is not configuring broker-side compression, then we will write the 
retained messages using their
original compression type. 

Current patch supports above points.


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
---


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed 
> messages
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-12 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Aug. 12, 2014, 4:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Ignored messages with null Keys during compaction. This is for KAFKA-1581.


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-12 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
---

(Updated Aug. 12, 2014, 4:57 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
---

Ignored messages with null Keys during compaction. This is for KAFKA-1581. It 
is a simple fix. so combining with this patch.


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-18 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50893
---


Thanks for the patch. Looks good overall.

Could you run the stress test in TestLogCleaning with compression turned on and 
see if there is any problem?


core/src/main/scala/kafka/log/LogCleaner.scala


Hmm, I think the original approach of throwing an exception is probably 
better. When handling the produce requests, we can reject messages w/o a key, 
if the topic is configured with compaction. Once we do that, there should be no 
messages with null key during compaction. If that happens, we should just fail 
the broker.



core/src/main/scala/kafka/log/LogCleaner.scala


Could we use MemoryRecords.RecordsIterator to iterate compressed messages?



core/src/main/scala/kafka/log/LogCleaner.scala


Could this be named compressMessages()?


- Jun Rao


On Aug. 12, 2014, 4:57 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 12, 2014, 4:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Ignored messages with null Keys during compaction. This is for KAFKA-1581. It 
> is a simple fix. so combining with this patch.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



Re: Review Request 24214: Patch for KAFKA-1374

2014-08-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
---


I should be able to review this later today. However, as Jun also mentioned can 
you please run the stress test? When I was working on the original (WIP) patch 
it worked but eventually failed (due to various reasons such as corrupt message 
sizes, etc) on a stress test after several segments had rolled and after 
several log cleaner runs - although I didn't get time to look into it your 
patch should have hopefully addressed these issues.

- Joel Koshy


On Aug. 12, 2014, 4:57 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> ---
> 
> (Updated Aug. 12, 2014, 4:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
> https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Ignored messages with null Keys during compaction. This is for KAFKA-1581. It 
> is a simple fix. so combining with this patch.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>