Re: Review Request 32650: Patch for KAFKA-2000

2015-05-06 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review82775
---


Thanks for the updated patch.


core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/32650/#comment133593

Sorry I didn't notice this earlier. This message is now slightly incorrect. 
Can we get a break-up of the number of offsets deleted due to expiration and 
due to topic deletion? BTW I'm touching this in KAFKA-2163 as well (which you 
may want to check out).



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133586

Pre-existing issue, but could you rename this to OffsetManagementTest?



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133587

testOffsetsDeletedAfterTopicDeletion



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133588

Can you use the more recent commit version (which has an explicit retention 
time?)



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133590

Can you also commit offsets for some other topic and verify that those 
offsets are _not_ deleted?

As mentioned in the earlier RB, for the first scenario, we depend on the 
condition that the UpdateMetadataRequest is sent first. It would be good to 
have a unit test that explicitly tests this so we never unknowingly break that 
assumption. I don't have a good way to test this though :( If you have any 
ideas that would be great. Part of the issue is we have little to no test 
coverage on the controller.


- Joel Koshy


On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated May 3, 2015, 5:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 18680ce100f10035175cc0263ba7787ab0f6a17a 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 652208a70f66045b854549d93cbbc2b77c24b10b 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-05-03 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/
---

(Updated May 3, 2015, 5:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2000
https://issues.apache.org/jira/browse/KAFKA-2000


Repository: kafka


Description
---

KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.


Diffs (updated)
-

  core/src/main/scala/kafka/server/OffsetManager.scala 
18680ce100f10035175cc0263ba7787ab0f6a17a 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
652208a70f66045b854549d93cbbc2b77c24b10b 

Diff: https://reviews.apache.org/r/32650/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



Re: Review Request 32650: Patch for KAFKA-2000

2015-05-03 Thread Sriharsha Chintalapani


 On April 23, 2015, 9:51 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 124
  https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124
 
  A safer fix is to proactively purge as part of UpdateMetadataRequest - 
  i.e., removePartitionInfo in metadata cache.
  
  Your fix is nice, but we need to make sure of the following: on a given 
  offset manager (broker) the metadata cache must contain topic X before any 
  consumer of topic X (and whose group is managed by that broker) commits 
  offsets for topic X.
  
  The original scenario I was concerned about should be fine:
  - Suppose broker A (offset manager for G) starts up
  - It receives UpdateMetadataRequests from the controller for all topics 
  in the cluster
  - It then receives LeaderAndIsrRequest for partitions of the offset 
  topic which make it the offset manager.
  - We should be fine _as long as_ the update metadata requests occur 
  first. So if we go with your approach we should at the very least add a 
  unit test to guarantee this.
  
  There is another scenario. If topic X is a new topic (or has new 
  partitions):
  - Broker A is the offset manager for consumer group G
  - Broker B leads a new partition of X
  - Controller C sends become leader to B and update metadata to A (which 
  will populate its metadata cache)
  - B becomes the leader first
  - A consumer starts consuming X and commits offsets to A (before it has 
  received the update metadata request)
  - Other consumers in the group may rebalance while all this is 
  happening (since new partitions for the topic appeared) and may fetch 
  offsets from A
  - But A could have deleted the offset by then.
  - This is improbable but not impossible.
  
  Onur mentioned another corner case:
  https://issues.apache.org/jira/browse/KAFKA-1787 
  
  Both would be solved by having topic generations and incorporating 
  generation information when determining which offsets to purge. I don't 
  think we have a jira open for that but I will follow-up offline with Onur.
  
  Do you see any other issues?
  
  So I think the options are:
  - Go with your approach + a unit test to ensure that the controller 
  sends update metadata request first.
  - Go with the more conservative fix which is to purge on 
  metadataCache.removePartitionInfo
  
  Also, we should add a unit test to verify offsets are in fact removed 
  after deletion.
 
 Joel Koshy wrote:
 Never mind - for the second scenario we are fine. We check in offset 
 manager if the topic exists before committing offsets.
 
 So your fix should be fine. Can you add a unit test?

Thanks for the review Joel. Added a unit test to check if the offsets are 
deleted after topic deletion.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
---


On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated May 3, 2015, 5:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 18680ce100f10035175cc0263ba7787ab0f6a17a 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 652208a70f66045b854549d93cbbc2b77c24b10b 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-04-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
---



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/32650/#comment131755

A safer fix is to proactively purge as part of UpdateMetadataRequest - 
i.e., removePartitionInfo in metadata cache.

Your fix is nice, but we need to make sure of the following: on a given 
offset manager (broker) the metadata cache must contain topic X before any 
consumer of topic X (and whose group is managed by that broker) commits offsets 
for topic X.

The original scenario I was concerned about should be fine:
- Suppose broker A (offset manager for G) starts up
- It receives UpdateMetadataRequests from the controller for all topics in 
the cluster
- It then receives LeaderAndIsrRequest for partitions of the offset topic 
which make it the offset manager.
- We should be fine _as long as_ the update metadata requests occur first. 
So if we go with your approach we should at the very least add a unit test to 
guarantee this.

There is another scenario. If topic X is a new topic (or has new 
partitions):
- Broker A is the offset manager for consumer group G
- Broker B leads a new partition of X
- Controller C sends become leader to B and update metadata to A (which 
will populate its metadata cache)
- B becomes the leader first
- A consumer starts consuming X and commits offsets to A (before it has 
received the update metadata request)
- Other consumers in the group may rebalance while all this is happening 
(since new partitions for the topic appeared) and may fetch offsets from A
- But A could have deleted the offset by then.
- This is improbable but not impossible.

Onur mentioned another corner case:
https://issues.apache.org/jira/browse/KAFKA-1787 

Both would be solved by having topic generations and incorporating 
generation information when determining which offsets to purge. I don't think 
we have a jira open for that but I will follow-up offline with Onur.

Do you see any other issues?

So I think the options are:
- Go with your approach + a unit test to ensure that the controller sends 
update metadata request first.
- Go with the more conservative fix which is to purge on 
metadataCache.removePartitionInfo

Also, we should add a unit test to verify offsets are in fact removed after 
deletion.


- Joel Koshy


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-04-23 Thread Joel Koshy


 On April 23, 2015, 9:51 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 124
  https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124
 
  A safer fix is to proactively purge as part of UpdateMetadataRequest - 
  i.e., removePartitionInfo in metadata cache.
  
  Your fix is nice, but we need to make sure of the following: on a given 
  offset manager (broker) the metadata cache must contain topic X before any 
  consumer of topic X (and whose group is managed by that broker) commits 
  offsets for topic X.
  
  The original scenario I was concerned about should be fine:
  - Suppose broker A (offset manager for G) starts up
  - It receives UpdateMetadataRequests from the controller for all topics 
  in the cluster
  - It then receives LeaderAndIsrRequest for partitions of the offset 
  topic which make it the offset manager.
  - We should be fine _as long as_ the update metadata requests occur 
  first. So if we go with your approach we should at the very least add a 
  unit test to guarantee this.
  
  There is another scenario. If topic X is a new topic (or has new 
  partitions):
  - Broker A is the offset manager for consumer group G
  - Broker B leads a new partition of X
  - Controller C sends become leader to B and update metadata to A (which 
  will populate its metadata cache)
  - B becomes the leader first
  - A consumer starts consuming X and commits offsets to A (before it has 
  received the update metadata request)
  - Other consumers in the group may rebalance while all this is 
  happening (since new partitions for the topic appeared) and may fetch 
  offsets from A
  - But A could have deleted the offset by then.
  - This is improbable but not impossible.
  
  Onur mentioned another corner case:
  https://issues.apache.org/jira/browse/KAFKA-1787 
  
  Both would be solved by having topic generations and incorporating 
  generation information when determining which offsets to purge. I don't 
  think we have a jira open for that but I will follow-up offline with Onur.
  
  Do you see any other issues?
  
  So I think the options are:
  - Go with your approach + a unit test to ensure that the controller 
  sends update metadata request first.
  - Go with the more conservative fix which is to purge on 
  metadataCache.removePartitionInfo
  
  Also, we should add a unit test to verify offsets are in fact removed 
  after deletion.

Never mind - for the second scenario we are fine. We check in offset manager if 
the topic exists before committing offsets.

So your fix should be fine. Can you add a unit test?


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review81413
---


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-04-21 Thread Joel Koshy


 On April 20, 2015, 11:18 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 124
  https://reviews.apache.org/r/32650/diff/1/?file=909897#file909897line124
 
  I think there is an issue in relying on the metadata cache mainly due 
  to start-up. E.g., when we start up the broker (and offset manager) the 
  metadata cache will actually be empty so this would delete _all_ the 
  offsets. Unfortunately even after start-up there is no _guarantee_ that you 
  have the most current information in the cache (say, if the controller 
  failed to send an UpdateMetadataRequest to the broker by the time the 
  compactor task runs)

Actually - I think what you have is correct. The offset cache would be empty at 
start-up and would only be populated on becoming leader. However, we just need 
to make sure that we get the complete cluster topic metadata before the 
compactor thread runs. I'll take another look tomorrow.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review80857
---


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-04-20 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review80857
---



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/32650/#comment130981

I think there is an issue in relying on the metadata cache mainly due to 
start-up. E.g., when we start up the broker (and offset manager) the metadata 
cache will actually be empty so this would delete _all_ the offsets. 
Unfortunately even after start-up there is no _guarantee_ that you have the 
most current information in the cache (say, if the controller failed to send an 
UpdateMetadataRequest to the broker by the time the compactor task runs)


- Joel Koshy


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Review Request 32650: Patch for KAFKA-2000

2015-03-30 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/
---

Review request for kafka.


Bugs: KAFKA-2000
https://issues.apache.org/jira/browse/KAFKA-2000


Repository: kafka


Description
---

KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.


Diffs
-

  core/src/main/scala/kafka/server/OffsetManager.scala 
395b1dbe43a5db47151e72a1b588d72f03cef963 

Diff: https://reviews.apache.org/r/32650/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



Re: Review Request 32650: Patch for KAFKA-2000

2015-03-30 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review78310
---

Ship it!


Very nice fix :)

- Gwen Shapira


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 32650: Patch for KAFKA-2000

2015-03-30 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review78337
---


This might be out of scope for this JIRA, but I think if the deleted topic gets 
recreated before compaction, the offsets corresponding to the older version of 
the topic won't be deleted.

This usually doesn't matter because auto.offset.reset will be triggered if the 
new version of the topic is smaller than the old version in terms of offsets. 
As with delete topic from zookeeper-based offsets, there's the edge case of the 
consumer skipping messages from the new version of the topic if old version's 
offsets still fit. This edge case was briefly discussed here: 
https://issues.apache.org/jira/browse/KAFKA-1787

- Onur Karaman


On March 30, 2015, 9:47 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated March 30, 2015, 9:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 395b1dbe43a5db47151e72a1b588d72f03cef963 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani